Skip to content

Commit

Permalink
fetch(lib/xchain): support streaming event logs
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Nov 4, 2024
1 parent b29e879 commit 6877baf
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 37 deletions.
4 changes: 4 additions & 0 deletions halo/attest/voter/voter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ func (stubProvider) StreamAsync(context.Context, xchain.ProviderRequest, xchain.
panic("unexpected")
}

func (stubProvider) StreamEventLogs(context.Context, xchain.EventLogsReq, xchain.EventLogsCallback) error {
panic("unexpected")
}

func (stubProvider) GetBlock(context.Context, xchain.ProviderRequest) (xchain.Block, bool, error) {
panic("unexpected")
}
Expand Down
4 changes: 2 additions & 2 deletions lib/cchain/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (p Provider) stream(
ctx := log.WithCtx(in, "src_chain", srcChain, "worker", workerName)

deps := stream.Deps[xchain.Attestation]{
FetchBatch: func(ctx context.Context, _ uint64, offset uint64) ([]xchain.Attestation, error) {
FetchBatch: func(ctx context.Context, offset uint64) ([]xchain.Attestation, error) {
return p.fetch(ctx, chainVer, offset)
},
Backoff: p.backoffFunc,
Expand Down Expand Up @@ -224,7 +224,7 @@ func (p Provider) stream(
}

cb := (stream.Callback[xchain.Attestation])(callback)
err := stream.Stream(ctx, deps, chainVer.ID, attestOffset, cb)
err := stream.Stream(ctx, deps, attestOffset, cb)
if err != nil {
return errors.Wrap(err, "stream attestations", "worker", workerName, "chain", srcChain)
}
Expand Down
10 changes: 5 additions & 5 deletions lib/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Cache[E any] interface {
type Deps[E any] struct {
// Dependency functions

// FetchBatch fetches the next batch of elements from the provided height (inclusive).
// The elements must be sequential, since the internal height cursors is incremented for each element returned.
FetchBatch func(ctx context.Context, chainID uint64, height uint64) ([]E, error)
// FetchBatch fetches the next batch of elements from the provided height (inclusive) if available.
// Returned elements must be strictly-sequential, since the internal height cursors is incremented for each element returned.
FetchBatch func(ctx context.Context, height uint64) ([]E, error)
// Backoff returns a backoff function. See expbackoff package for the implementation.
Backoff func(ctx context.Context) func()
// Verify is a sanity check function, it ensures each element is valid.
Expand Down Expand Up @@ -59,7 +59,7 @@ type Deps[E any] struct {
// It retries forever on fetch errors.
// It can either retry or return callback errors.
// It returns (nil) when the context is canceled.
func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, startHeight uint64, callback Callback[E]) error {
func Stream[E any](ctx context.Context, deps Deps[E], startHeight uint64, callback Callback[E]) error {
if deps.FetchWorkers == 0 {
return errors.New("invalid zero fetch worker count")
}
Expand All @@ -75,7 +75,7 @@ func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, startHe
}

fetchCtx, span := deps.StartTrace(ctx, height, "fetch")
elems, err := deps.FetchBatch(fetchCtx, srcChainID, height)
elems, err := deps.FetchBatch(fetchCtx, height)
span.End()

if ctx.Err() != nil {
Expand Down
27 changes: 26 additions & 1 deletion lib/xchain/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// ProviderCallback is the callback function signature that will be called with every finalized.
Expand All @@ -24,6 +25,24 @@ func (r ProviderRequest) ChainVersion() ChainVersion {
}
}

// EventLogsReq is the request to fetch EVM event logs.
type EventLogsReq struct {
ChainID uint64 // Source chain ID to query for xblocks.
Height uint64 // Height to query (from inclusive).
ConfLevel ConfLevel // Confirmation level to ensure
FilterAddress common.Address // Filter logs by optional address
FilterTopics []common.Hash // Filters zero or more topics (in the first position only).
}

func (r EventLogsReq) ChainVersion() ChainVersion {
return ChainVersion{
ID: r.ChainID,
ConfLevel: r.ConfLevel,
}
}

type EventLogsCallback func(ctx context.Context, height uint64, events []types.Log) error

// Provider abstracts fetching cross chain data from any supported chain.
// This is basically a cross-chain data client for all supported chains.
type Provider interface {
Expand All @@ -34,11 +53,17 @@ type Provider interface {
// It retries forever (with backoff) on all fetch and callback errors.
StreamAsync(ctx context.Context, req ProviderRequest, callback ProviderCallback) error

// StreamBlocks is the synchronous fail-fast version of Subscribe. It streams
// StreamBlocks is the synchronous fail-fast version of StreamBlocks. It streams
// xblocks as they become available but returns on the first callback error.
// This is useful for workers that need to reset on application errors.
StreamBlocks(ctx context.Context, req ProviderRequest, callback ProviderCallback) error

// StreamEventLogs streams EVM event logs as they become available.
//
// The callback will be called with strictly-sequential heights with logs matching the provided filter (which may be none).
// It returns any error encountered.
StreamEventLogs(ctx context.Context, req EventLogsReq, callback EventLogsCallback) error

// GetBlock returns the block for the given chain and height, or false if not available (not finalized yet),
// or an error. The AttestOffset field is populated with the provided offset (if required).
GetBlock(ctx context.Context, req ProviderRequest) (Block, bool, error)
Expand Down
16 changes: 8 additions & 8 deletions lib/xchain/provider/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ var (
Namespace: "lib",
Subsystem: "xprovider",
Name: "callback_error_total",
Help: "Total number of callback errors per source chain version. Alert if growing.",
}, []string{"chain_version"})
Help: "Total number of callback errors per source chain version and stream type. Alert if growing.",
}, []string{"chain_version", "type"})

fetchErrTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "lib",
Subsystem: "xprovider",
Name: "fetch_error_total",
Help: "Total number of fetch errors per source chain version. Alert if growing.",
}, []string{"chain_version"})
Help: "Total number of fetch errors per source chain version and stream type. Alert if growing.",
}, []string{"chain_version", "type"})

streamHeight = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "lib",
Subsystem: "xprovider",
Name: "stream_height",
Help: "Latest streamed xblock height per source chain version. Alert if not growing.",
}, []string{"chain_version"})
Help: "Latest streamed height per source chain version and stream type. Alert if not growing.",
}, []string{"chain_version", "type"})

callbackLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "lib",
Subsystem: "xprovider",
Name: "callback_latency_seconds",
Help: "Callback latency in seconds per source chain version. Alert if growing.",
Help: "Callback latency in seconds per source chain version and type. Alert if growing.",
Buckets: []float64{.001, .002, .005, .01, .025, .05, .1, .25, .5, 1, 2.5},
}, []string{"chain_version"})
}, []string{"chain_version", "type"})
)
4 changes: 4 additions & 0 deletions lib/xchain/provider/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (m *Mock) StreamBlocks(ctx context.Context, req xchain.ProviderRequest, cal
return m.stream(ctx, req, callback, false)
}

func (*Mock) StreamEventLogs(context.Context, xchain.EventLogsReq, xchain.EventLogsCallback) error {
return errors.New("unsupported")
}

func (*Mock) ChainVersionHeight(context.Context, xchain.ChainVersion) (uint64, error) {
return 0, errors.New("unsupported")
}
Expand Down
44 changes: 27 additions & 17 deletions lib/xchain/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"go.opentelemetry.io/otel/trace"
)

const (
streamTypeXBlock = "xblock"
streamTypeEvent = "event"
)

// fetchWorkerThresholds defines the number of concurrent workers
// to fetch xblocks by chain block period.
var fetchWorkerThresholds = []struct {
Expand All @@ -34,6 +39,17 @@ var fetchWorkerThresholds = []struct {
{Workers: 4, MinPeriod: 0}, // 4 workers for fastest chains (arb_sepolia)
}

// getWorkers returns the workers from the first exceeded threshold that matches.
func getWorkers(chain netconf.Chain) (uint64, error) {
for _, threshold := range fetchWorkerThresholds {
if chain.BlockPeriod >= threshold.MinPeriod {
return threshold.Workers, nil
}
}

return 0, errors.New("no matching threshold [BUG]")
}

var _ xchain.Provider = (*Provider)(nil)

// Provider stores the source chain configuration and the global quit channel.
Expand Down Expand Up @@ -124,15 +140,9 @@ func (p *Provider) stream(

chainVersionName := p.network.ChainVersionName(xchain.ChainVersion{ID: req.ChainID, ConfLevel: req.ConfLevel})

var workers uint64 // Pick the first threshold that matches (or the last one)
for _, threshold := range fetchWorkerThresholds {
workers = threshold.Workers
if chain.BlockPeriod >= threshold.MinPeriod {
break
}
}
if workers == 0 {
return errors.New("zero workers [BUG]")
workers, err := getWorkers(chain)
if err != nil {
return err
}

// Start streaming from chain's deploy height as per config.
Expand All @@ -143,9 +153,9 @@ func (p *Provider) stream(

deps := stream.Deps[xchain.Block]{
FetchWorkers: workers,
FetchBatch: func(ctx context.Context, chainID uint64, height uint64) ([]xchain.Block, error) {
FetchBatch: func(ctx context.Context, height uint64) ([]xchain.Block, error) {
fetchReq := xchain.ProviderRequest{
ChainID: chainID,
ChainID: req.ChainID,
Height: height,
ConfLevel: req.ConfLevel,
}
Expand Down Expand Up @@ -184,19 +194,19 @@ func (p *Provider) stream(
return nil
},
IncFetchErr: func() {
fetchErrTotal.WithLabelValues(chainVersionName).Inc()
fetchErrTotal.WithLabelValues(chainVersionName, streamTypeXBlock).Inc()
},
IncCallbackErr: func() {
callbackErrTotal.WithLabelValues(chainVersionName).Inc()
callbackErrTotal.WithLabelValues(chainVersionName, streamTypeXBlock).Inc()
},
SetStreamHeight: func(h uint64) {
streamHeight.WithLabelValues(chainVersionName).Set(float64(h))
streamHeight.WithLabelValues(chainVersionName, streamTypeXBlock).Set(float64(h))
},
SetCallbackLatency: func(d time.Duration) {
callbackLatency.WithLabelValues(chainVersionName).Observe(d.Seconds())
callbackLatency.WithLabelValues(chainVersionName, streamTypeXBlock).Observe(d.Seconds())
},
StartTrace: func(ctx context.Context, height uint64, spanName string) (context.Context, trace.Span) {
return tracer.StartChainHeight(ctx, p.network.ID, chain.Name, height, path.Join("xprovider", spanName))
return tracer.StartChainHeight(ctx, p.network.ID, chain.Name, height, path.Join("xblock", spanName))
},
}

Expand All @@ -205,7 +215,7 @@ func (p *Provider) stream(
ctx = log.WithCtx(ctx, "chain", chainVersionName)
log.Info(ctx, "Streaming xprovider blocks", "from_height", fromHeight)

return stream.Stream(ctx, deps, req.ChainID, fromHeight, cb)
return stream.Stream(ctx, deps, fromHeight, cb)
}

// getEVMChain provides the configuration of the given chainID.
Expand Down
22 changes: 22 additions & 0 deletions lib/xchain/provider/provider_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"

"github.com/stretchr/testify/require"
)

// NewForT returns a new provider for testing. Note that cprovider isn't supported yet.
Expand All @@ -29,3 +31,23 @@ func NewForT(
confHeads: make(map[xchain.ChainVersion]uint64),
}
}

//nolint:paralleltest // Access global thresholds not locked
func TestThresholds(t *testing.T) {
for i, threshold := range fetchWorkerThresholds {
if i == 0 {
continue
}

// Ensure thresholds are in decreasing min period
require.Greater(t, fetchWorkerThresholds[i-1].MinPeriod, threshold.MinPeriod)

// Ensure workers are in increasing number
require.Less(t, fetchWorkerThresholds[i-1].Workers, threshold.Workers)

// Ensure last threshold is catch-all.
if i == len(fetchWorkerThresholds)-1 {
require.Empty(t, threshold.MinPeriod)
}
}
}
3 changes: 1 addition & 2 deletions lib/xchain/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"go.uber.org/mock/gomock"
)

//nolint:paralleltest // Access global thresholds not locked
func TestProvider(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())

var mu sync.Mutex
Expand Down
Loading

0 comments on commit 6877baf

Please sign in to comment.