Subscribing to Streams
Transactions - subscribeNewTxs
This method opens a subscription in the form of a gRPC stream and broadcasts pending transaction that the node receives, either from the Fibernet or from full nodes it's connected to.
The transactions on this stream have not been validated, and in some cases they can be invalid, so we recommend having a process for filtering out invalid transactions.
The reason Fiber does this is to not introduce any latency in the message path, ensuring fastest possible delivery.
Let's look at how users can subscribe to the pending transactions stream:
We're omitting the connection code snipped, but your client needs to be connected to Fiber first.
- Golang
- Rust
- JavaScript
- Python
import (
"context"
"log"
"time"
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
// First make a sink channel on which to receive the transactions
ch := make(chan *fiber.Transaction)
go func() {
// This is a blocking call, so it needs to run in a Goroutine
if err := client.SubscribeNewTxs(nil, ch); err != nil {
log.Fatal(err)
}
}()
// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
The transaction type we use here (fiber.Transaction
), contains all possible fields of all the different transaction types. You can differentiate them
with the type
field. There's also a helper method to convert this transaction to a go-ethereum.types.Transaction
type, which you can do with
toNative()
.
Example:
...
for tx := range ch {
nativeTx := tx.ToNative()
handleGethTransaction(nativeTx)
}
use fiber::Client;
#[tokio::main]
async fn main() {
// Client needs to be mutable
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"YOUR_API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_txs(None).await;
// Consume the stream
while let Some(tx) = sub.next().await {
handle_transaction(tx);
}
}
The stream yields transactions that are ethers::types::Transaction
types from the ethers-rs crate.
import { Client, TxFilter, hexToBytes } from 'fiber-ts';
import { TypedTransaction } from '@ethereumjs/tx';
...
const sub = client.subscribeNewTxs();
sub.on("data", (tx: TypedTransaction) => {
handleTx(tx);
});
try:
sub = client.subscribe_new_txs()
# Iterate over transaction stream
for tx in sub:
do_something(tx)
except Exception as e:
print("error subscribing", e)
Filtering
The subscribe
methods allows for parameters which allow users to filter the transactions they receive.
Currently, we support filtering on the following transaction fields:
- Sender
- Receiver
- MethodID
- Value (greater than)
Here few examples to show how to use these filters:
- Golang
- Rust
- JavaScript
- Python
import (
"context"
"log"
"time"
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
// Construct filter
// example 1: all transactions with either of these addresses as the receiver
f := filter.New(filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"),
))
// example 2: all transactions with a value greater than 1 ETH
f := filter.New(filter.Value(big.NewInt(1) * big.NewInt(1e18)))
// example 3: all ERC20 transfers on the 2 tokens below
f := filter.New(filter.And(
filter.MethodID("0xa9059cbb"),
filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0xdAC17F958D2ee523a2206206994597C13D831ec7"),
),
))
ch := make(chan *fiber.Transaction)
go func() {
// apply filter
if err := client.SubscribeNewTxs(f, ch); err != nil {
log.Fatal(err)
}
}()
// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
Constructing filters with the Rust package is not very ergonomic yet. We're working on using macros to improve this process.
use fiber::Client;
#[tokio::main]
async fn main() {
// Client needs to be mutable
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"YOUR_API_KEY".to_string()
).await.unwrap();
// Construct filter
// example 1: simple receiver filter
let f = Filter::new()
.to("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D");
// example 2: all transactions with either of these addresses as the receiver
let f = Filter::new()
.or() // creates a new 'OR' level
.to("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48")
.to("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D");
// example 3: all ERC20 transfers on the 2 tokens below
let f = Filter::new()
.and()
.method_id("0xa9059cbb")
.or()
.to("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48")
.to("0xdAC17F958D2ee523a2206206994597C13D831ec7");
// Encode the filter
let mut sub = client.subscribe_new_txs(f.encode().unwrap()).await;
// Consume the stream
while let Some(tx) = sub.next().await {
handle_transaction(tx);
}
}
The stream yields transactions that are ethers::types::Transaction
types from the ethers-rs crate.
🚧 WIP 🚧
🚧 WIP 🚧
Evaluating filters will introduce anywhere from 10 to 200 microseconds, depending on the complexity of the filter. There's currently a limit of 16 filter elements or "nodes" in the filter tree.
Block Headers - subscribeNewExecutionHeaders
ExecutionHeaders
are the headers of the blocks that are part of the execution layer (eth1).
These contain the traditional block header.
In contrast with the ExecutionPayloads
, headers do not contain the full list of transactions.
Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.
Let's see how to subscribe to new block headers in the different packages:
- Golang
- Rust
- Javascript
- Python
import (
...
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
ch := make(chan *fiber.Block)
go func() {
if err := client.SubscribeNewExecutionPayloadHeaders(ch); err != nil {
log.Fatal(err)
}
}()
for block := range ch {
handleBlock(block)
}
}
use fiber::Client;
#[tokio::main]
async fn main() {
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_execution_headers().await;
// Consume the stream
while let Some(block) = sub.next().await {
handle_block(tx);
}
}
import { Block } from "fiber-ts";
...
const sub = client.subscribeNewExecutionHeaders();
sub.on("data", (block: Block) => {
handleBlock(block);
});
try:
sub = client.subscribe_new_execution_payload_headers()
for block in sub:
do_something(block)
except Exception as e:
print("error subscribing", e)
Block Payloads - subscribeNewExecutionPayloads
Execution Payloads are the traditional Blocks broadcasted on the execution layer (eth1). These contain the traditional block header and the full list of transactions.
Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.
Let's see how to subscribe to new execution payloads in the different packages:
- Golang
- Rust
- Javascript
- Python
import (
...
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
ch := make(chan *fiber.Block)
go func() {
if err := client.SubscribeNewExecutionPayloads(ch); err != nil {
log.Fatal(err)
}
}()
for block := range ch {
handleBlock(block)
}
}
use fiber::Client;
#[tokio::main]
async fn main() {
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_execution_payloads().await;
// Consume the stream
while let Some(block) = sub.next().await {
handle_block(tx);
}
}
import { Block } from "fiber-ts";
...
const sub = client.subscribeNewExecutionPayloads();
sub.on("data", (block: Block) => {
handleBlock(block);
});
try:
sub = client.subscribe_new_execution_payloads()
for block in sub:
do_something(block)
except Exception as e:
print("error subscribing", e)
Beacon Blocks - subscribeNewBeaconBlocks
Beacon Blocks are the blocks broadcasted on the beacon chain (eth2), which contain the canonical consensus info.
We currently strip out the execution payload from this stream, as to keep the stream as light and fast as possible.
If you also need the execution payload, please use the subscribeNewExecutionPayloads
stream.
Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.
Let's see how to subscribe to new beacon blocks in the different packages:
- Golang
- Rust
- Javascript
- Python
import (
...
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
ch := make(chan *fiber.Block)
go func() {
if err := client.SubscribeNewBeaconBlocks(ch); err != nil {
log.Fatal(err)
}
}()
for block := range ch {
handleBlock(block)
}
}
use fiber::Client;
#[tokio::main]
async fn main() {
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_beacon_blocks().await;
// Consume the stream
while let Some(block) = sub.next().await {
handle_block(tx);
}
}
import { Block } from "fiber-ts";
...
const sub = client.subscribeNewBeaconBlocks();
sub.on("data", (block: Block) => {
handleBlock(block);
});
try:
sub = client.subscribe_new_beacon_blocks()
for block in sub:
do_something(block)
except Exception as e:
print("error subscribing", e)