diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index f9571ecb6e..82242921af 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -33,8 +33,10 @@ type ExecutionClient struct { contractAddress ethcommon.Address // optional - logger *zap.Logger - metrics metrics + logger *zap.Logger + metrics metrics + // followDistance defines an offset into the past from the head block such that the block + // at this offset will be considered as very likely finalized. followDistance uint64 // TODO: consider reading the finalized checkpoint from consensus layer connectionTimeout time.Duration reconnectionInitialInterval time.Duration diff --git a/eth/executionclient/execution_client_test.go b/eth/executionclient/execution_client_test.go index 0bcc24a583..7949e7df88 100644 --- a/eth/executionclient/execution_client_test.go +++ b/eth/executionclient/execution_client_test.go @@ -5,7 +5,6 @@ import ( "math/big" "net/http/httptest" "strings" - "sync" "sync/atomic" "testing" "time" @@ -164,14 +163,10 @@ func TestStreamLogs(t *testing.T) { require.NoError(t, err) logs := client.StreamLogs(ctx, 0) - var wg sync.WaitGroup var streamedLogs []ethtypes.Log - - // Receive emitted events - wg.Add(1) var streamedLogsCount atomic.Int64 go func() { - defer wg.Done() + // Receive emitted events, this func will exit when test exits. for block := range logs { streamedLogs = append(streamedLogs, block.Logs...) streamedLogsCount.Add(int64(len(block.Logs))) @@ -189,29 +184,40 @@ func TestStreamLogs(t *testing.T) { } // Wait for blocksWithLogsLength-followDistance blocks to be streamed. -Unfollowed: +Wait1: for { select { case <-ctx.Done(): - require.Equal(t, int64(blocksWithLogsLength-followDistance), streamedLogsCount.Load()) + require.Failf(t, "timed out", "err: %v, streamedLogsCount: %d", ctx.Err(), streamedLogsCount.Load()) case <-time.After(time.Millisecond * 5): if streamedLogsCount.Load() == int64(blocksWithLogsLength-followDistance) { - break Unfollowed + break Wait1 } } } // Create empty blocks with no transactions to advance the chain - // to followDistance blocks behind the head. + // followDistance blocks ahead. for i := 0; i < followDistance; i++ { sim.Commit() time.Sleep(delay) } - - require.NoError(t, client.Close()) - wg.Wait() + // Wait for streamed logs to advance accordingly. +Wait2: + for { + select { + case <-ctx.Done(): + require.Failf(t, "timed out", "err: %v, streamedLogsCount: %d", ctx.Err(), streamedLogsCount.Load()) + case <-time.After(time.Millisecond * 5): + if streamedLogsCount.Load() == int64(blocksWithLogsLength) { + break Wait2 + } + } + } require.NotEmpty(t, streamedLogs) require.Equal(t, blocksWithLogsLength, len(streamedLogs)) + + require.NoError(t, client.Close()) require.NoError(t, sim.Close()) } diff --git a/eth/executionclient/options.go b/eth/executionclient/options.go index ccac7ddada..31c849695e 100644 --- a/eth/executionclient/options.go +++ b/eth/executionclient/options.go @@ -23,8 +23,8 @@ func WithMetrics(metrics metrics) Option { } } -// WithFollowDistance sets finalization offset. -// It defines how many blocks in the past the latest block we want to process is. +// WithFollowDistance sets finalization offset (a block at this offset into the past +// from the head block will be considered as very likely finalized). func WithFollowDistance(offset uint64) Option { return func(s *ExecutionClient) { s.followDistance = offset