From 6877bafcdef19d69baebef994dd4974547a4ca53 Mon Sep 17 00:00:00 2001 From: corver Date: Mon, 4 Nov 2024 15:42:08 +0200 Subject: [PATCH] fetch(lib/xchain): support streaming event logs --- halo/attest/voter/voter_test.go | 4 + lib/cchain/provider/provider.go | 4 +- lib/stream/stream.go | 10 +- lib/xchain/provider.go | 27 ++- lib/xchain/provider/metrics.go | 16 +- lib/xchain/provider/mock.go | 4 + lib/xchain/provider/provider.go | 44 +++-- lib/xchain/provider/provider_internal_test.go | 22 +++ lib/xchain/provider/provider_test.go | 3 +- lib/xchain/provider/streamlogs.go | 155 ++++++++++++++++++ relayer/app/cursors_internal_test.go | 8 +- 11 files changed, 260 insertions(+), 37 deletions(-) create mode 100644 lib/xchain/provider/streamlogs.go diff --git a/halo/attest/voter/voter_test.go b/halo/attest/voter/voter_test.go index f1c67198d..33ab67af6 100644 --- a/halo/attest/voter/voter_test.go +++ b/halo/attest/voter/voter_test.go @@ -541,6 +541,10 @@ func (stubProvider) StreamAsync(context.Context, xchain.ProviderRequest, xchain. panic("unexpected") } +func (stubProvider) StreamEventLogs(context.Context, xchain.EventLogsReq, xchain.EventLogsCallback) error { + panic("unexpected") +} + func (stubProvider) GetBlock(context.Context, xchain.ProviderRequest) (xchain.Block, bool, error) { panic("unexpected") } diff --git a/lib/cchain/provider/provider.go b/lib/cchain/provider/provider.go index 57431e9a1..f7b2edcad 100644 --- a/lib/cchain/provider/provider.go +++ b/lib/cchain/provider/provider.go @@ -180,7 +180,7 @@ func (p Provider) stream( ctx := log.WithCtx(in, "src_chain", srcChain, "worker", workerName) deps := stream.Deps[xchain.Attestation]{ - FetchBatch: func(ctx context.Context, _ uint64, offset uint64) ([]xchain.Attestation, error) { + FetchBatch: func(ctx context.Context, offset uint64) ([]xchain.Attestation, error) { return p.fetch(ctx, chainVer, offset) }, Backoff: p.backoffFunc, @@ -224,7 +224,7 @@ func (p Provider) stream( } cb := (stream.Callback[xchain.Attestation])(callback) - err := stream.Stream(ctx, deps, chainVer.ID, attestOffset, cb) + err := stream.Stream(ctx, deps, attestOffset, cb) if err != nil { return errors.Wrap(err, "stream attestations", "worker", workerName, "chain", srcChain) } diff --git a/lib/stream/stream.go b/lib/stream/stream.go index 6cd95610b..cc66ffb83 100644 --- a/lib/stream/stream.go +++ b/lib/stream/stream.go @@ -27,9 +27,9 @@ type Cache[E any] interface { type Deps[E any] struct { // Dependency functions - // FetchBatch fetches the next batch of elements from the provided height (inclusive). - // The elements must be sequential, since the internal height cursors is incremented for each element returned. - FetchBatch func(ctx context.Context, chainID uint64, height uint64) ([]E, error) + // FetchBatch fetches the next batch of elements from the provided height (inclusive) if available. + // Returned elements must be strictly-sequential, since the internal height cursors is incremented for each element returned. + FetchBatch func(ctx context.Context, height uint64) ([]E, error) // Backoff returns a backoff function. See expbackoff package for the implementation. Backoff func(ctx context.Context) func() // Verify is a sanity check function, it ensures each element is valid. @@ -59,7 +59,7 @@ type Deps[E any] struct { // It retries forever on fetch errors. // It can either retry or return callback errors. // It returns (nil) when the context is canceled. -func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, startHeight uint64, callback Callback[E]) error { +func Stream[E any](ctx context.Context, deps Deps[E], startHeight uint64, callback Callback[E]) error { if deps.FetchWorkers == 0 { return errors.New("invalid zero fetch worker count") } @@ -75,7 +75,7 @@ func Stream[E any](ctx context.Context, deps Deps[E], srcChainID uint64, startHe } fetchCtx, span := deps.StartTrace(ctx, height, "fetch") - elems, err := deps.FetchBatch(fetchCtx, srcChainID, height) + elems, err := deps.FetchBatch(fetchCtx, height) span.End() if ctx.Err() != nil { diff --git a/lib/xchain/provider.go b/lib/xchain/provider.go index 5b9a98fba..2f6966b69 100644 --- a/lib/xchain/provider.go +++ b/lib/xchain/provider.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) // ProviderCallback is the callback function signature that will be called with every finalized. @@ -24,6 +25,24 @@ func (r ProviderRequest) ChainVersion() ChainVersion { } } +// EventLogsReq is the request to fetch EVM event logs. +type EventLogsReq struct { + ChainID uint64 // Source chain ID to query for xblocks. + Height uint64 // Height to query (from inclusive). + ConfLevel ConfLevel // Confirmation level to ensure + FilterAddress common.Address // Filter logs by optional address + FilterTopics []common.Hash // Filters zero or more topics (in the first position only). +} + +func (r EventLogsReq) ChainVersion() ChainVersion { + return ChainVersion{ + ID: r.ChainID, + ConfLevel: r.ConfLevel, + } +} + +type EventLogsCallback func(ctx context.Context, height uint64, events []types.Log) error + // Provider abstracts fetching cross chain data from any supported chain. // This is basically a cross-chain data client for all supported chains. type Provider interface { @@ -34,11 +53,17 @@ type Provider interface { // It retries forever (with backoff) on all fetch and callback errors. StreamAsync(ctx context.Context, req ProviderRequest, callback ProviderCallback) error - // StreamBlocks is the synchronous fail-fast version of Subscribe. It streams + // StreamBlocks is the synchronous fail-fast version of StreamBlocks. It streams // xblocks as they become available but returns on the first callback error. // This is useful for workers that need to reset on application errors. StreamBlocks(ctx context.Context, req ProviderRequest, callback ProviderCallback) error + // StreamEventLogs streams EVM event logs as they become available. + // + // The callback will be called with strictly-sequential heights with logs matching the provided filter (which may be none). + // It returns any error encountered. + StreamEventLogs(ctx context.Context, req EventLogsReq, callback EventLogsCallback) error + // GetBlock returns the block for the given chain and height, or false if not available (not finalized yet), // or an error. The AttestOffset field is populated with the provided offset (if required). GetBlock(ctx context.Context, req ProviderRequest) (Block, bool, error) diff --git a/lib/xchain/provider/metrics.go b/lib/xchain/provider/metrics.go index b1bcf0f86..f69bb1c18 100644 --- a/lib/xchain/provider/metrics.go +++ b/lib/xchain/provider/metrics.go @@ -10,28 +10,28 @@ var ( Namespace: "lib", Subsystem: "xprovider", Name: "callback_error_total", - Help: "Total number of callback errors per source chain version. Alert if growing.", - }, []string{"chain_version"}) + Help: "Total number of callback errors per source chain version and stream type. Alert if growing.", + }, []string{"chain_version", "type"}) fetchErrTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "lib", Subsystem: "xprovider", Name: "fetch_error_total", - Help: "Total number of fetch errors per source chain version. Alert if growing.", - }, []string{"chain_version"}) + Help: "Total number of fetch errors per source chain version and stream type. Alert if growing.", + }, []string{"chain_version", "type"}) streamHeight = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "lib", Subsystem: "xprovider", Name: "stream_height", - Help: "Latest streamed xblock height per source chain version. Alert if not growing.", - }, []string{"chain_version"}) + Help: "Latest streamed height per source chain version and stream type. Alert if not growing.", + }, []string{"chain_version", "type"}) callbackLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "lib", Subsystem: "xprovider", Name: "callback_latency_seconds", - Help: "Callback latency in seconds per source chain version. Alert if growing.", + Help: "Callback latency in seconds per source chain version and type. Alert if growing.", Buckets: []float64{.001, .002, .005, .01, .025, .05, .1, .25, .5, 1, 2.5}, - }, []string{"chain_version"}) + }, []string{"chain_version", "type"}) ) diff --git a/lib/xchain/provider/mock.go b/lib/xchain/provider/mock.go index 526360538..1510583f0 100644 --- a/lib/xchain/provider/mock.go +++ b/lib/xchain/provider/mock.go @@ -70,6 +70,10 @@ func (m *Mock) StreamBlocks(ctx context.Context, req xchain.ProviderRequest, cal return m.stream(ctx, req, callback, false) } +func (*Mock) StreamEventLogs(context.Context, xchain.EventLogsReq, xchain.EventLogsCallback) error { + return errors.New("unsupported") +} + func (*Mock) ChainVersionHeight(context.Context, xchain.ChainVersion) (uint64, error) { return 0, errors.New("unsupported") } diff --git a/lib/xchain/provider/provider.go b/lib/xchain/provider/provider.go index 45bef098e..a91db6187 100644 --- a/lib/xchain/provider/provider.go +++ b/lib/xchain/provider/provider.go @@ -23,6 +23,11 @@ import ( "go.opentelemetry.io/otel/trace" ) +const ( + streamTypeXBlock = "xblock" + streamTypeEvent = "event" +) + // fetchWorkerThresholds defines the number of concurrent workers // to fetch xblocks by chain block period. var fetchWorkerThresholds = []struct { @@ -34,6 +39,17 @@ var fetchWorkerThresholds = []struct { {Workers: 4, MinPeriod: 0}, // 4 workers for fastest chains (arb_sepolia) } +// getWorkers returns the workers from the first exceeded threshold that matches. +func getWorkers(chain netconf.Chain) (uint64, error) { + for _, threshold := range fetchWorkerThresholds { + if chain.BlockPeriod >= threshold.MinPeriod { + return threshold.Workers, nil + } + } + + return 0, errors.New("no matching threshold [BUG]") +} + var _ xchain.Provider = (*Provider)(nil) // Provider stores the source chain configuration and the global quit channel. @@ -124,15 +140,9 @@ func (p *Provider) stream( chainVersionName := p.network.ChainVersionName(xchain.ChainVersion{ID: req.ChainID, ConfLevel: req.ConfLevel}) - var workers uint64 // Pick the first threshold that matches (or the last one) - for _, threshold := range fetchWorkerThresholds { - workers = threshold.Workers - if chain.BlockPeriod >= threshold.MinPeriod { - break - } - } - if workers == 0 { - return errors.New("zero workers [BUG]") + workers, err := getWorkers(chain) + if err != nil { + return err } // Start streaming from chain's deploy height as per config. @@ -143,9 +153,9 @@ func (p *Provider) stream( deps := stream.Deps[xchain.Block]{ FetchWorkers: workers, - FetchBatch: func(ctx context.Context, chainID uint64, height uint64) ([]xchain.Block, error) { + FetchBatch: func(ctx context.Context, height uint64) ([]xchain.Block, error) { fetchReq := xchain.ProviderRequest{ - ChainID: chainID, + ChainID: req.ChainID, Height: height, ConfLevel: req.ConfLevel, } @@ -184,19 +194,19 @@ func (p *Provider) stream( return nil }, IncFetchErr: func() { - fetchErrTotal.WithLabelValues(chainVersionName).Inc() + fetchErrTotal.WithLabelValues(chainVersionName, streamTypeXBlock).Inc() }, IncCallbackErr: func() { - callbackErrTotal.WithLabelValues(chainVersionName).Inc() + callbackErrTotal.WithLabelValues(chainVersionName, streamTypeXBlock).Inc() }, SetStreamHeight: func(h uint64) { - streamHeight.WithLabelValues(chainVersionName).Set(float64(h)) + streamHeight.WithLabelValues(chainVersionName, streamTypeXBlock).Set(float64(h)) }, SetCallbackLatency: func(d time.Duration) { - callbackLatency.WithLabelValues(chainVersionName).Observe(d.Seconds()) + callbackLatency.WithLabelValues(chainVersionName, streamTypeXBlock).Observe(d.Seconds()) }, StartTrace: func(ctx context.Context, height uint64, spanName string) (context.Context, trace.Span) { - return tracer.StartChainHeight(ctx, p.network.ID, chain.Name, height, path.Join("xprovider", spanName)) + return tracer.StartChainHeight(ctx, p.network.ID, chain.Name, height, path.Join("xblock", spanName)) }, } @@ -205,7 +215,7 @@ func (p *Provider) stream( ctx = log.WithCtx(ctx, "chain", chainVersionName) log.Info(ctx, "Streaming xprovider blocks", "from_height", fromHeight) - return stream.Stream(ctx, deps, req.ChainID, fromHeight, cb) + return stream.Stream(ctx, deps, fromHeight, cb) } // getEVMChain provides the configuration of the given chainID. diff --git a/lib/xchain/provider/provider_internal_test.go b/lib/xchain/provider/provider_internal_test.go index fbde51228..cebf75f0c 100644 --- a/lib/xchain/provider/provider_internal_test.go +++ b/lib/xchain/provider/provider_internal_test.go @@ -7,6 +7,8 @@ import ( "github.com/omni-network/omni/lib/ethclient" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" + + "github.com/stretchr/testify/require" ) // NewForT returns a new provider for testing. Note that cprovider isn't supported yet. @@ -29,3 +31,23 @@ func NewForT( confHeads: make(map[xchain.ChainVersion]uint64), } } + +//nolint:paralleltest // Access global thresholds not locked +func TestThresholds(t *testing.T) { + for i, threshold := range fetchWorkerThresholds { + if i == 0 { + continue + } + + // Ensure thresholds are in decreasing min period + require.Greater(t, fetchWorkerThresholds[i-1].MinPeriod, threshold.MinPeriod) + + // Ensure workers are in increasing number + require.Less(t, fetchWorkerThresholds[i-1].Workers, threshold.Workers) + + // Ensure last threshold is catch-all. + if i == len(fetchWorkerThresholds)-1 { + require.Empty(t, threshold.MinPeriod) + } + } +} diff --git a/lib/xchain/provider/provider_test.go b/lib/xchain/provider/provider_test.go index b1c359eb1..932ffb9fe 100644 --- a/lib/xchain/provider/provider_test.go +++ b/lib/xchain/provider/provider_test.go @@ -20,9 +20,8 @@ import ( "go.uber.org/mock/gomock" ) +//nolint:paralleltest // Access global thresholds not locked func TestProvider(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) var mu sync.Mutex diff --git a/lib/xchain/provider/streamlogs.go b/lib/xchain/provider/streamlogs.go new file mode 100644 index 000000000..251e560f9 --- /dev/null +++ b/lib/xchain/provider/streamlogs.go @@ -0,0 +1,155 @@ +package provider + +import ( + "context" + "path" + "time" + + "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/expbackoff" + "github.com/omni-network/omni/lib/stream" + "github.com/omni-network/omni/lib/tracer" + "github.com/omni-network/omni/lib/umath" + "github.com/omni-network/omni/lib/xchain" + + "github.com/ethereum/go-ethereum/core/types" + + "go.opentelemetry.io/otel/trace" +) + +// events extends zero or more event logs with an explicit height. +type events struct { + Height uint64 + Events []types.Log +} + +func (p *Provider) StreamEventLogs(ctx context.Context, req xchain.EventLogsReq, callback xchain.EventLogsCallback) error { + if req.Height == 0 { + return errors.New("invalid zero height") + } + + chain, _, err := p.getEVMChain(req.ChainID) + if err != nil { + return err + } + + workers, err := getWorkers(chain) + if err != nil { + return err + } + + chainVersionName := p.network.ChainVersionName(req.ChainVersion()) + + deps := stream.Deps[events]{ + FetchWorkers: workers, + FetchBatch: func(ctx context.Context, height uint64) ([]events, error) { + fetchReq := xchain.EventLogsReq{ + ChainID: req.ChainID, + Height: height, + ConfLevel: req.ConfLevel, + FilterAddress: req.FilterAddress, + FilterTopics: req.FilterTopics, + } + + var lastErr error + const retryCount = 5 + backoff := expbackoff.New(ctx, expbackoff.WithPeriodicConfig(time.Millisecond*100)) + for i := 0; i < retryCount; i++ { + logs, exists, err := p.GetEventLogs(ctx, fetchReq) + if err != nil { + lastErr = err + backoff() + } else if !exists { + return nil, nil + } else { + return []events{{ + Height: height, + Events: logs, + }}, nil + } + } + + return nil, lastErr + }, + Backoff: p.backoffFunc, + ElemLabel: "events", + HeightLabel: "height", + RetryCallback: false, + Height: func(logs events) uint64 { + return logs.Height + }, + Verify: func(_ context.Context, events events, h uint64) error { + if events.Height != h { + return errors.New("invalid block height") + } + + return nil + }, + IncFetchErr: func() { + fetchErrTotal.WithLabelValues(chainVersionName, streamTypeEvent).Inc() + }, + IncCallbackErr: func() { + callbackErrTotal.WithLabelValues(chainVersionName, streamTypeEvent).Inc() + }, + SetStreamHeight: func(h uint64) { + streamHeight.WithLabelValues(chainVersionName, streamTypeEvent).Set(float64(h)) + }, + SetCallbackLatency: func(d time.Duration) { + callbackLatency.WithLabelValues(chainVersionName, streamTypeEvent).Observe(d.Seconds()) + }, + StartTrace: func(ctx context.Context, height uint64, spanName string) (context.Context, trace.Span) { + return tracer.StartChainHeight(ctx, p.network.ID, chain.Name, height, path.Join("events", spanName)) + }, + } + + return stream.Stream[events](ctx, deps, req.Height, func(ctx context.Context, events events) error { + return callback(ctx, events.Height, events.Events) + }) +} + +// GetEventLogs returns the evn event logs for the provided request, or false if not available yet (not finalized), +// or an error. +func (p *Provider) GetEventLogs(ctx context.Context, req xchain.EventLogsReq) ([]types.Log, bool, error) { + ctx, span := tracer.Start(ctx, spanName("get_events")) + defer span.End() + + _, ethCl, err := p.getEVMChain(req.ChainID) + if err != nil { + return nil, false, err + } + + // First check if height is confirmed. + var header *types.Header + if !p.confirmedCache(req.ChainVersion(), req.Height) { + // No higher cached header available, so fetch the latest head + latest, err := p.headerByChainVersion(ctx, req.ChainVersion()) + if err != nil { + return nil, false, errors.Wrap(err, "header by strategy") + } + + // If still lower, we reached the head of the chain, return false + if latest.Number.Uint64() < req.Height { + return nil, false, nil + } + + // Use this header if it matches height + if latest.Number.Uint64() == req.Height { + header = latest + } + } + + // Fetch the header if we didn't find it in the cache + if header == nil { + header, err = ethCl.HeaderByNumber(ctx, umath.NewBigInt(req.Height)) + if err != nil { + return nil, false, errors.Wrap(err, "header by number") + } + } + + events, err := getEventLogs(ctx, ethCl, req.FilterAddress, header.Hash(), req.FilterTopics) + if err != nil { + return nil, false, err + } + + return events, true, nil +} diff --git a/relayer/app/cursors_internal_test.go b/relayer/app/cursors_internal_test.go index 77c00a377..4f532767f 100644 --- a/relayer/app/cursors_internal_test.go +++ b/relayer/app/cursors_internal_test.go @@ -104,11 +104,15 @@ func (*mockXChainClient) GetSubmission(context.Context, uint64, common.Hash) (xc panic("unexpected") } -func (m *mockXChainClient) StreamAsync(context.Context, xchain.ProviderRequest, xchain.ProviderCallback) error { +func (*mockXChainClient) StreamAsync(context.Context, xchain.ProviderRequest, xchain.ProviderCallback) error { panic("unexpected") } -func (m *mockXChainClient) StreamBlocks(context.Context, xchain.ProviderRequest, xchain.ProviderCallback) error { +func (*mockXChainClient) StreamBlocks(context.Context, xchain.ProviderRequest, xchain.ProviderCallback) error { + panic("unexpected") +} + +func (*mockXChainClient) StreamEventLogs(context.Context, xchain.EventLogsReq, xchain.EventLogsCallback) error { panic("unexpected") }