Skip to content

Commit

Permalink
op-node: event handling on block attributes
Browse files Browse the repository at this point in the history
todo
  • Loading branch information
protolambda committed Jun 22, 2024
1 parent ec9f39b commit 8ec52c7
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 415 deletions.
77 changes: 52 additions & 25 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -38,17 +39,18 @@ type L2Verifier struct {
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
}

synchronousEvents *rollup.SynchronousEvents

syncDeriver *driver.SyncDeriver

// L2 rollup
engine *engine.EngineController
derivation *derive.DerivationPipeline
clSync *clsync.CLSync

attributesHandler driver.AttributesHandler
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config

l1 derive.L1Fetcher
l1State *driver.L1State
Expand Down Expand Up @@ -101,26 +103,25 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
finalizer = finality.NewFinalizer(log, cfg, l1, ec)
}

attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, eng)
attributesHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, synchronousEvents)

pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents)

syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
Derivation: pipeline,
Finalizer: finalizer,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
}

engDeriv := engine.NewEngDeriver(log, ctx, cfg, ec, synchronousEvents)
Expand All @@ -132,7 +133,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
clSync: clSync,
derivation: pipeline,
finalizer: finalizer,
attributesHandler: attributesHandler,
safeHeadListener: safeHeadListener,
syncCfg: syncCfg,
syncDeriver: syncDeriver,
Expand All @@ -142,6 +142,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
l2Building: false,
rollupCfg: cfg,
rpc: rpc.NewServer(),
synchronousEvents: synchronousEvents,
}

*rootDeriver = rollup.SynchronousDerivers{
Expand All @@ -151,6 +152,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
rollupNode,
clSync,
pipelineDeriver,
attributesHandler,
}

t.Cleanup(rollupNode.rpc.Stop)
Expand Down Expand Up @@ -305,14 +307,34 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) {
}
}

// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
func (s *L2Verifier) ActL2PipelineStep(t Testing) {
t.Helper()
t.Fatal("ActL2PipelineStep is not supported anymore")
}

func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) {
s.ActL2EventsUntil(t, func(ev rollup.Event) bool {
x, ok := ev.(engine.PendingSafeUpdateEvent)
return ok && x.PendingSafe.Number == num
}, 1000, false)
}

func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev rollup.Event) bool, max int, excl bool) {
t.Helper()
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
for i := 0; i < max; i++ {
err := s.synchronousEvents.DrainUntil(fn, excl)
if err == nil {
return
}
if err == io.EOF {
s.synchronousEvents.Emit(driver.StepEvent{})
}
}
t.Fatalf("event condition did not hit, ran maximum number of steps: %d", max)
}

func (s *L2Verifier) ActL2PipelineFull(t Testing) {
Expand All @@ -326,14 +348,19 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
if i > 10_000 {
t.Fatalf("ActL2PipelineFull running for too long. Is a deriver looping?")
}
s.ActL2PipelineStep(t)
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
}
}

// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
s.synchronousEvents.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
}
}

Expand Down
83 changes: 46 additions & 37 deletions op-e2e/actions/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ import (
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/engine"
Expand All @@ -22,7 +16,16 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)

func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut {
Expand Down Expand Up @@ -262,19 +265,17 @@ func TestBackupUnsafe(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
// Run until we consume A1 from batch
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
// backupUnsafe is still empty
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())

// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
// Run until we consume B2 from batch
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
Expand Down Expand Up @@ -425,19 +426,16 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
// Run till we consumed A1 from batch
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
// backupUnsafe is still empty
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())

// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
Expand All @@ -447,14 +445,14 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {

// B3 is invalid block
// NextAttributes is called
sequencer.ActL2PipelineStep(t)
// forceNextSafeAttributes is called
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)
// mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState})

// TryBackupUnsafeReorg is called
sequencer.ActL2PipelineStep(t)
// The backup-unsafe rewind is applied

// try to process invalid leftovers: B4, B5
sequencer.ActL2PipelineFull(t)
Expand Down Expand Up @@ -565,18 +563,15 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
// pendingSafe must not be advanced as well
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0))
// Preheat engine queue and consume A1 from batch
for i := 0; i < 4; i++ {
sequencer.ActL2PipelineStep(t)
}
sequencer.ActL2EventsUntilPending(t, 1)
// A1 is valid original block so pendingSafe is advanced
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1))
require.Equal(t, sequencer.L2Unsafe().Number, uint64(5))
// backupUnsafe is still empty
require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe())

// Process B2
sequencer.ActL2PipelineStep(t)
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntilPending(t, 2)
// B2 is valid different block, triggering unsafe chain reorg
require.Equal(t, sequencer.L2Unsafe().Number, uint64(2))
// B2 is valid different block, triggering unsafe block backup
Expand All @@ -585,17 +580,21 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
require.Equal(t, sequencer.L2PendingSafe().Number, uint64(2))

// B3 is invalid block
// NextAttributes is called
sequencer.ActL2PipelineStep(t)
// forceNextSafeAttributes is called
sequencer.ActL2PipelineStep(t)
// wait till attributes processing (excl.) before mocking errors
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)

serverErrCnt := 2
for i := 0; i < serverErrCnt; i++ {
// mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, engine.GenericServerError)
// TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry
sequencer.ActL2PipelineStep(t)
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, ok := ev.(rollup.EngineTemporaryErrorEvent)
return ok
}, 100, false)
// backupUnsafeHead not emptied yet
require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash)
}
Expand Down Expand Up @@ -980,7 +979,12 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
// wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, pending := ev.(engine2.PendingSafeUpdateEvent)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
Expand Down Expand Up @@ -1027,7 +1031,12 @@ func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
verifier.ActL1HeadSignal(t)
verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle {
verifier.ActL2PipelineStep(t)
// wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool {
_, pending := ev.(engine2.PendingSafeUpdateEvent)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0))
Expand Down
Loading

0 comments on commit 8ec52c7

Please sign in to comment.