Skip to main content

Subscribing to Streams

Transactions - subscribeNewTxs

note

This method opens a subscription in the form of a gRPC stream and broadcasts pending transaction that the node receives, either from the Fibernet or from full nodes it's connected to.

caution

The transactions on this stream have not been validated, and in some cases they can be invalid, so we recommend having a process for filtering out invalid transactions.

The reason Fiber does this is to not introduce any latency in the message path, ensuring fastest possible delivery.

Let's look at how users can subscribe to the pending transactions stream:

We're omitting the connection code snipped, but your client needs to be connected to Fiber first.

import (
"context"
"log"
"time"

fiber "github.com/chainbound/fiber-go"
)

func main() {
...

// First make a sink channel on which to receive the transactions
ch := make(chan *fiber.Transaction)
go func() {
// This is a blocking call, so it needs to run in a Goroutine
if err := client.SubscribeNewTxs(nil, ch); err != nil {
log.Fatal(err)
}
}()

// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
info

The transaction type we use here (fiber.Transaction), contains all possible fields of all the different transaction types. You can differentiate them with the type field. There's also a helper method to convert this transaction to a go-ethereum.types.Transaction type, which you can do with toNative().

Example:

...

for tx := range ch {
nativeTx := tx.ToNative()
handleGethTransaction(nativeTx)
}

Filtering

The subscribe methods allows for parameters which allow users to filter the transactions they receive. Currently, we support filtering on the following transaction fields:

  • Sender
  • Receiver
  • MethodID
  • Value (greater than)

Here few examples to show how to use these filters:

import (
"context"
"log"
"time"

fiber "github.com/chainbound/fiber-go"
)

func main() {
...

// Construct filter
// example 1: all transactions with either of these addresses as the receiver
f := filter.New(filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"),
))

// example 2: all transactions with a value greater than 1 ETH
f := filter.New(filter.Value(big.NewInt(1) * big.NewInt(1e18)))

// example 3: all ERC20 transfers on the 2 tokens below
f := filter.New(filter.And(
filter.MethodID("0xa9059cbb"),
filter.Or(
filter.To("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
filter.To("0xdAC17F958D2ee523a2206206994597C13D831ec7"),
),
))

ch := make(chan *fiber.Transaction)
go func() {
// apply filter
if err := client.SubscribeNewTxs(f, ch); err != nil {
log.Fatal(err)
}
}()

// Listen for incoming transactions
for tx := range ch {
handleTransaction(tx)
}
}
info

Evaluating filters will introduce anywhere from 10 to 200 microseconds, depending on the complexity of the filter. There's currently a limit of 16 filter elements or "nodes" in the filter tree.

Block Headers - subscribeNewExecutionHeaders

ExecutionHeaders are the headers of the blocks that are part of the execution layer (eth1). These contain the traditional block header. In contrast with the ExecutionPayloads, headers do not contain the full list of transactions.

caution

Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.

Let's see how to subscribe to new block headers in the different packages:

import (
...
fiber "github.com/chainbound/fiber-go"
)

func main() {
...

ch := make(chan *fiber.Block)

go func() {
if err := client.SubscribeNewExecutionPayloadHeaders(ch); err != nil {
log.Fatal(err)
}
}()

for block := range ch {
handleBlock(block)
}
}

Block Payloads - subscribeNewExecutionPayloads

Execution Payloads are the traditional Blocks broadcasted on the execution layer (eth1). These contain the traditional block header and the full list of transactions.

caution

Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.

Let's see how to subscribe to new execution payloads in the different packages:

import (
...
fiber "github.com/chainbound/fiber-go"
)

func main() {
...

ch := make(chan *fiber.Block)

go func() {
if err := client.SubscribeNewExecutionPayloads(ch); err != nil {
log.Fatal(err)
}
}()

for block := range ch {
handleBlock(block)
}
}

Beacon Blocks - subscribeNewBeaconBlocks

Beacon Blocks are the blocks broadcasted on the beacon chain (eth2), which contain the canonical consensus info. We currently strip out the execution payload from this stream, as to keep the stream as light and fast as possible. If you also need the execution payload, please use the subscribeNewExecutionPayloads stream.

caution

Blocks streamed are not finalized, meaning that the data is not guaranteed to be part of the canonical chain. Recent blocks can always be reorged.

Let's see how to subscribe to new beacon blocks in the different packages:

import (
...
fiber "github.com/chainbound/fiber-go"
)

func main() {
...

ch := make(chan *fiber.Block)

go func() {
if err := client.SubscribeNewBeaconBlocks(ch); err != nil {
log.Fatal(err)
}
}()

for block := range ch {
handleBlock(block)
}
}