Subscribing to Streams
Transactions
This method opens a subscription in the form of a gRPC stream and broadcasts pending transaction that the node receives, either from the Fibernet or from full nodes it's connected to.
The transactions on this stream have not been validated, and in some cases they can be invalid, so we recommend having a process for filtering out invalid transactions.
The reason Fiber does this is to not introduce any latency in the message path, ensuring fastest possible delivery.
Let's look at how users can subscribe to the pending transactions stream:
We're omitting the connection code snipped, but your client needs to be connected to Fiber first.
- Golang
- Rust
- JavaScript
- Python
import (
"context"
"log"
"time"
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
// First make a sink channel on which to receive the transactions
ch := make(chan *fiber.TransactionWithSender)
go func() {
// This is a blocking call, so it needs to run in a Goroutine
if err := client.SubscribeNewTxs(nil, ch); err != nil {
log.Fatal(err)
}
}()
// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
The transaction type we use here (fiber.TransactionWithSender
), in a wrapper for the native go-ethereum types.Transaction
type,
with the addition of the Sender
address field. We include the sender address to avoid having to recalculate it in the client.
use fiber::Client;
use tokio_util::StreamExt;
#[tokio::main]
async fn main() {
// Client needs to be mutable
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"YOUR_API_KEY".to_string()
).await.unwrap();
// Open the subscription with no filters
let mut sub = client.subscribe_new_transactions(None).await;
// Consume the stream
while let Some(tx) = sub.next().await {
handle_transaction(tx);
}
}
The stream yields transactions that are TransactionSignedEcRecovered
types from the reth_primitives crate.
import { Client, TxFilter, hexToBytes } from 'fiber-ts';
import { TypedTransaction } from '@ethereumjs/tx';
...
const sub = client.subscribeNewTxs();
sub.on("data", (tx: TypedTransaction) => {
handleTx(tx);
});
try:
sub = client.subscribe_new_transactions()
# Iterate over transaction stream
for tx in sub:
do_something(tx)
except Exception as e:
print("error subscribing", e)
Filtering
The subscribe
methods allows for parameters which allow users to filter the transactions they receive.
Currently, we support filtering on the following transaction fields:
- Sender
- Receiver
- MethodID
- Value (greater than)
Here few examples to show how to use these filters:
- Golang
- Rust
- JavaScript
- Python
import (
"context"
"log"
"time"
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
// Construct filter
// example 1: all transactions with either of these addresses as the receiver
f := filter.New(filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"),
))
// example 2: all transactions with a value greater than 1 ETH
f := filter.New(filter.Value(big.NewInt(1) * big.NewInt(1e18)))
// example 3: all ERC20 transfers on the 2 tokens below
f := filter.New(filter.And(
filter.MethodID("0xa9059cbb"),
filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0xdAC17F958D2ee523a2206206994597C13D831ec7"),
),
))
ch := make(chan *fiber.Transaction)
go func() {
// apply filter
if err := client.SubscribeNewTxs(f, ch); err != nil {
log.Fatal(err)
}
}()
// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
Constructing filters with the Rust package is not very ergonomic yet. We're working on using macros to improve this process.
use fiber::Client;
use tokio_util::StreamExt;
#[tokio::main]
async fn main() {
// Client needs to be mutable
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"YOUR_API_KEY".to_string()
).await.unwrap();
// Construct filter
// example 1: simple receiver filter
let f = Filter::new()
.to("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D");
// example 2: all transactions with either of these addresses as the receiver
let f = Filter::new()
.or() // creates a new 'OR' level
.to("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48")
.to("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D");
// example 3: all ERC20 transfers on the 2 tokens below
let f = Filter::new()
.and()
.method_id("0xa9059cbb")
.or()
.to("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48")
.to("0xdAC17F958D2ee523a2206206994597C13D831ec7");
// Encode the filter
let mut sub = client.subscribe_new_transactions(f.encode().unwrap()).await;
// Consume the stream
while let Some(tx) = sub.next().await {
handle_transaction(tx);
}
}
🚧 WIP 🚧
🚧 WIP 🚧
Evaluating filters will introduce anywhere from 10 to 200 microseconds, depending on the complexity of the filter. There's currently a limit of 16 filter elements or "nodes" in the filter tree.
Block Payloads
Execution Payloads are the traditional Blocks broadcasted on the execution layer (eth1). These contain the traditional block header, the full list of transactions and beacon chain withdrawals.
Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.
Let's see how to subscribe to new execution payloads in the different packages:
- Golang
- Rust
- Javascript
- Python
Execution payloads are returned as *fiber.Block
which is a wrapper around go-ethereum
native types
such as Header
, Transaction
and Withdrawal
.
import (
...
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
ch := make(chan *fiber.Block)
go func() {
if err := client.SubscribeNewExecutionPayloads(ch); err != nil {
log.Fatal(err)
}
}()
for block := range ch {
handleBlock(block)
}
}
The type returned by this stream is an alloy-rpc-types::Block
.
Since the blocks returned are parsed from consensus-layer payloads, they are missing the following fields,
which are set to None
or zero
in all returned stream items:
parent_beacon_block_root
transactions_root
withdrawals_root
use fiber::Client;
use tokio_util::StreamExt;
#[tokio::main]
async fn main() {
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_execution_payloads().await;
// Consume the stream
while let Some(block) = sub.next().await {
handle_block(tx);
}
}
import { Block } from "fiber-ts";
...
const sub = client.subscribeNewExecutionPayloads();
sub.on("data", (block: Block) => {
handleBlock(block);
});
try:
sub = client.subscribe_new_execution_payloads()
for block in sub:
do_something(block)
except Exception as e:
print("error subscribing", e)
Beacon Blocks
Beacon Blocks are the blocks broadcasted on the beacon chain (eth2), which contain the canonical consensus info.
We currently strip out the execution payload from this stream, as to keep the stream as light and fast as possible.
If you also need the execution payload, please use the subscribeNewExecutionPayloads
stream.
Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.
Let's see how to subscribe to new beacon blocks in the different packages:
- Golang
- Rust
- Javascript
- Python
import (
...
fiber "github.com/chainbound/fiber-go"
)
func main() {
...
ch := make(chan *fiber.Block)
go func() {
if err := client.SubscribeNewBeaconBlocks(ch); err != nil {
log.Fatal(err)
}
}()
for block := range ch {
handleBlock(block)
}
}
use fiber::Client;
use tokio_util::StreamExt;
#[tokio::main]
async fn main() {
let mut client = Client::connect(
"beta.fiberapi.io:8080".to_string(),
"API_KEY".to_string()
).await.unwrap();
let mut sub = client.subscribe_new_beacon_blocks().await;
// Consume the stream
while let Some(block) = sub.next().await {
handle_block(tx);
}
}
import { Block } from "fiber-ts";
...
const sub = client.subscribeNewBeaconBlocks();
sub.on("data", (block: Block) => {
handleBlock(block);
});
try:
sub = client.subscribe_new_beacon_blocks()
for block in sub:
do_something(block)
except Exception as e:
print("error subscribing", e)