Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: executionclient fix & simplify #1781

Open
wants to merge 1 commit into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ type ExecutionClient struct {
contractAddress ethcommon.Address

// optional
logger *zap.Logger
metrics metrics
logger *zap.Logger
metrics metrics
// followDistance defines an offset into the past from the head block such that the block
// at this offset will be considered as very likely finalized.
followDistance uint64 // TODO: consider reading the finalized checkpoint from consensus layer
connectionTimeout time.Duration
reconnectionInitialInterval time.Duration
Expand Down
32 changes: 19 additions & 13 deletions eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math/big"
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -164,14 +163,10 @@ func TestStreamLogs(t *testing.T) {
require.NoError(t, err)

logs := client.StreamLogs(ctx, 0)
var wg sync.WaitGroup
var streamedLogs []ethtypes.Log

// Receive emitted events
wg.Add(1)
var streamedLogsCount atomic.Int64
go func() {
defer wg.Done()
// Receive emitted events, this func will exit when test exits.
for block := range logs {
streamedLogs = append(streamedLogs, block.Logs...)
streamedLogsCount.Add(int64(len(block.Logs)))
Expand All @@ -189,29 +184,40 @@ func TestStreamLogs(t *testing.T) {
}

// Wait for blocksWithLogsLength-followDistance blocks to be streamed.
Unfollowed:
Wait1:
for {
select {
case <-ctx.Done():
require.Equal(t, int64(blocksWithLogsLength-followDistance), streamedLogsCount.Load())
require.Failf(t, "timed out", "err: %v, streamedLogsCount: %d", ctx.Err(), streamedLogsCount.Load())
case <-time.After(time.Millisecond * 5):
if streamedLogsCount.Load() == int64(blocksWithLogsLength-followDistance) {
break Unfollowed
break Wait1
}
}
}
Comment on lines -192 to 197
Copy link
Contributor Author

@iurii-ssv iurii-ssv Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, (on stage version) if we drop into case <-ctx.Done() branch here this for loop will very likely spin perpetually (allocating new Timer on every iteration through time.After).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch


// Create empty blocks with no transactions to advance the chain
// to followDistance blocks behind the head.
// followDistance blocks ahead.
for i := 0; i < followDistance; i++ {
sim.Commit()
time.Sleep(delay)
}

require.NoError(t, client.Close())
wg.Wait()
// Wait for streamed logs to advance accordingly.
Wait2:
for {
select {
case <-ctx.Done():
require.Failf(t, "timed out", "err: %v, streamedLogsCount: %d", ctx.Err(), streamedLogsCount.Load())
case <-time.After(time.Millisecond * 5):
if streamedLogsCount.Load() == int64(blocksWithLogsLength) {
break Wait2
}
}
}
require.NotEmpty(t, streamedLogs)
require.Equal(t, blocksWithLogsLength, len(streamedLogs))

require.NoError(t, client.Close())
require.NoError(t, sim.Close())
}

Expand Down
4 changes: 2 additions & 2 deletions eth/executionclient/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func WithMetrics(metrics metrics) Option {
}
}

// WithFollowDistance sets finalization offset.
// It defines how many blocks in the past the latest block we want to process is.
// WithFollowDistance sets finalization offset (a block at this offset into the past
// from the head block will be considered as very likely finalized).
func WithFollowDistance(offset uint64) Option {
return func(s *ExecutionClient) {
s.followDistance = offset
Expand Down