Skip to main content

Subscribing to Streams

Transactions

note

This method opens a subscription in the form of a gRPC stream and broadcasts pending transaction that the node receives, either from the Fibernet or from full nodes it's connected to.

caution

The transactions on this stream have not been validated, and in some cases they can be invalid, so we recommend having a process for filtering out invalid transactions.

The reason Fiber does this is to not introduce any latency in the message path, ensuring fastest possible delivery.

Let's look at how users can subscribe to the pending transactions stream:

We're omitting the connection code snipped, but your client needs to be connected to Fiber first.

import (
"context"
"log"
"time"

fiber "github.com/chainbound/fiber-go"
)

func main() {
...

// First make a sink channel on which to receive the transactions
ch := make(chan *fiber.TransactionWithSender)
go func() {
// This is a blocking call, so it needs to run in a Goroutine
if err := client.SubscribeNewTxs(nil, ch); err != nil {
log.Fatal(err)
}
}()

// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
info

The transaction type we use here (fiber.TransactionWithSender), in a wrapper for the native go-ethereum types.Transaction type, with the addition of the Sender address field. We include the sender address to avoid having to recalculate it in the client.

Filtering

The subscribe methods allows for parameters which allow users to filter the transactions they receive. Currently, we support filtering on the following transaction fields:

  • Sender
  • Receiver
  • MethodID
  • Value (greater than)

Here few examples to show how to use these filters:

import (
"context"
"log"
"time"

fiber "github.com/chainbound/fiber-go"
)

func main() {
...

// Construct filter
// example 1: all transactions with either of these addresses as the receiver
f := filter.New(filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"),
))

// example 2: all transactions with a value greater than 1 ETH
f := filter.New(filter.Value(big.NewInt(1) * big.NewInt(1e18)))

// example 3: all ERC20 transfers on the 2 tokens below
f := filter.New(filter.And(
filter.MethodID("0xa9059cbb"),
filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0xdAC17F958D2ee523a2206206994597C13D831ec7"),
),
))

ch := make(chan *fiber.Transaction)
go func() {
// apply filter
if err := client.SubscribeNewTxs(f, ch); err != nil {
log.Fatal(err)
}
}()

// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
info

Evaluating filters will introduce anywhere from 10 to 200 microseconds, depending on the complexity of the filter. There's currently a limit of 16 filter elements or "nodes" in the filter tree.

Block Payloads

Execution Payloads are the traditional Blocks broadcasted on the execution layer (eth1). These contain the traditional block header, the full list of transactions and beacon chain withdrawals.

caution

Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.

Let's see how to subscribe to new execution payloads in the different packages:

Execution payloads are returned as *fiber.Block which is a wrapper around go-ethereum native types such as Header, Transaction and Withdrawal.

import (
...
fiber "github.com/chainbound/fiber-go"
)

func main() {
...

ch := make(chan *fiber.Block)

go func() {
if err := client.SubscribeNewExecutionPayloads(ch); err != nil {
log.Fatal(err)
}
}()

for block := range ch {
handleBlock(block)
}
}

Beacon Blocks

Beacon Blocks are the blocks broadcasted on the beacon chain (eth2), which contain the canonical consensus info. We currently strip out the execution payload from this stream, as to keep the stream as light and fast as possible. If you also need the execution payload, please use the subscribeNewExecutionPayloads stream.

caution

Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.

Let's see how to subscribe to new beacon blocks in the different packages:

import (
...
fiber "github.com/chainbound/fiber-go"
)

func main() {
...

ch := make(chan *fiber.Block)

go func() {
if err := client.SubscribeNewBeaconBlocks(ch); err != nil {
log.Fatal(err)
}
}()

for block := range ch {
handleBlock(block)
}
}