From a457c5b3b9b7d256db1ec3364b115645ddb34bec Mon Sep 17 00:00:00 2001 From: protolambda Date: Sun, 23 Jun 2024 16:24:52 -0600 Subject: [PATCH] op-node: attributes-handler with events (#10947) * op-node: event handling on block attributes todo * op-node: update plasma step to no longer hardcode pipeline stepping --- op-e2e/actions/l2_verifier.go | 74 ++-- op-e2e/actions/plasma_test.go | 23 +- op-e2e/actions/sync_test.go | 83 +++-- op-node/rollup/attributes/attributes.go | 222 +++++------ op-node/rollup/attributes/attributes_test.go | 364 +++++++++---------- op-node/rollup/derive/deriver.go | 18 + op-node/rollup/derive/plasma_data_source.go | 4 +- op-node/rollup/driver/driver.go | 30 +- op-node/rollup/driver/state.go | 22 +- op-node/rollup/synchronous.go | 27 ++ op-program/client/driver/program.go | 5 + op-program/client/driver/program_test.go | 1 + op-service/testutils/mock_emitter.go | 4 + 13 files changed, 451 insertions(+), 426 deletions(-) diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index 682a89db68e5..e1363394b878 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "github.com/stretchr/testify/require" @@ -38,6 +39,8 @@ type L2Verifier struct { L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error) } + synchronousEvents *rollup.SynchronousEvents + syncDeriver *driver.SyncDeriver // L2 rollup @@ -45,10 +48,9 @@ type L2Verifier struct { derivation *derive.DerivationPipeline clSync *clsync.CLSync - attributesHandler driver.AttributesHandler - safeHeadListener rollup.SafeHeadListener - finalizer driver.Finalizer - syncCfg *sync.Config + safeHeadListener rollup.SafeHeadListener + finalizer driver.Finalizer + syncCfg *sync.Config l1 derive.L1Fetcher l1State *driver.L1State @@ -101,26 +103,25 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri finalizer = finality.NewFinalizer(log, cfg, l1, ec) } - attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, eng) + attributesHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, synchronousEvents) pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics) pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents) syncDeriver := &driver.SyncDeriver{ - Derivation: pipeline, - Finalizer: finalizer, - AttributesHandler: attributesHandler, - SafeHeadNotifs: safeHeadListener, - CLSync: clSync, - Engine: ec, - SyncCfg: syncCfg, - Config: cfg, - L1: l1, - L2: eng, - Emitter: synchronousEvents, - Log: log, - Ctx: ctx, - Drain: synchronousEvents.Drain, + Derivation: pipeline, + Finalizer: finalizer, + SafeHeadNotifs: safeHeadListener, + CLSync: clSync, + Engine: ec, + SyncCfg: syncCfg, + Config: cfg, + L1: l1, + L2: eng, + Emitter: synchronousEvents, + Log: log, + Ctx: ctx, + Drain: synchronousEvents.Drain, } engDeriv := engine.NewEngDeriver(log, ctx, cfg, ec, synchronousEvents) @@ -132,7 +133,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri clSync: clSync, derivation: pipeline, finalizer: finalizer, - attributesHandler: attributesHandler, safeHeadListener: safeHeadListener, syncCfg: syncCfg, syncDeriver: syncDeriver, @@ -142,6 +142,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri l2Building: false, rollupCfg: cfg, rpc: rpc.NewServer(), + synchronousEvents: synchronousEvents, } *rootDeriver = rollup.SynchronousDerivers{ @@ -151,6 +152,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri rollupNode, clSync, pipelineDeriver, + attributesHandler, } t.Cleanup(rollupNode.rpc.Stop) @@ -305,14 +307,29 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) { } } -// ActL2PipelineStep runs one iteration of the L2 derivation pipeline -func (s *L2Verifier) ActL2PipelineStep(t Testing) { +func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) { + s.ActL2EventsUntil(t, func(ev rollup.Event) bool { + x, ok := ev.(engine.PendingSafeUpdateEvent) + return ok && x.PendingSafe.Number == num + }, 1000, false) +} + +func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev rollup.Event) bool, max int, excl bool) { + t.Helper() if s.l2Building { t.InvalidAction("cannot derive new data while building L2 block") return } - s.syncDeriver.Emitter.Emit(driver.StepEvent{}) - require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step") + for i := 0; i < max; i++ { + err := s.synchronousEvents.DrainUntil(fn, excl) + if err == nil { + return + } + if err == io.EOF { + s.synchronousEvents.Emit(driver.StepEvent{}) + } + } + t.Fatalf("event condition did not hit, ran maximum number of steps: %d", max) } func (s *L2Verifier) ActL2PipelineFull(t Testing) { @@ -326,14 +343,19 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) { if i > 10_000 { t.Fatalf("ActL2PipelineFull running for too long. Is a deriver looping?") } - s.ActL2PipelineStep(t) + if s.l2Building { + t.InvalidAction("cannot derive new data while building L2 block") + return + } + s.syncDeriver.Emitter.Emit(driver.StepEvent{}) + require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step") } } // ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action { return func(t Testing) { - s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload}) + s.synchronousEvents.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload}) } } diff --git a/op-e2e/actions/plasma_test.go b/op-e2e/actions/plasma_test.go index bd574f00053d..bc67ba4ac4f7 100644 --- a/op-e2e/actions/plasma_test.go +++ b/op-e2e/actions/plasma_test.go @@ -5,18 +5,21 @@ import ( "math/rand" "testing" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/node/safedb" + "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-plasma/bindings" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/stretchr/testify/require" ) // Devnet allocs should have alt-da mode enabled for these tests to pass @@ -497,9 +500,13 @@ func TestPlasma_SequencerStalledMultiChallenges(gt *testing.T) { // advance the pipeline until it errors out as it is still stuck // on deriving the first commitment - for i := 0; i < 3; i++ { - a.sequencer.ActL2PipelineStep(t) - } + a.sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { + x, ok := ev.(rollup.EngineTemporaryErrorEvent) + if ok { + require.ErrorContains(t, x.Err, "failed to fetch input data") + } + return ok + }, 100, false) // keep track of the second commitment comm2 := a.lastComm diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index e7521bdd8c94..c094351553e8 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -7,13 +7,7 @@ import ( "testing" "time" - "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - "github.com/ethereum-optimism/optimism/op-node/rollup/sync" - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum-optimism/optimism/op-service/sources" - "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/beacon/engine" @@ -22,7 +16,16 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" ) func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut { @@ -262,10 +265,8 @@ func TestBackupUnsafe(gt *testing.T) { require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe()) // pendingSafe must not be advanced as well require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0)) - // Preheat engine queue and consume A1 from batch - for i := 0; i < 4; i++ { - sequencer.ActL2PipelineStep(t) - } + // Run until we consume A1 from batch + sequencer.ActL2EventsUntilPending(t, 1) // A1 is valid original block so pendingSafe is advanced require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1)) require.Equal(t, sequencer.L2Unsafe().Number, uint64(5)) @@ -273,8 +274,8 @@ func TestBackupUnsafe(gt *testing.T) { require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe()) // Process B2 - sequencer.ActL2PipelineStep(t) - sequencer.ActL2PipelineStep(t) + // Run until we consume B2 from batch + sequencer.ActL2EventsUntilPending(t, 2) // B2 is valid different block, triggering unsafe chain reorg require.Equal(t, sequencer.L2Unsafe().Number, uint64(2)) // B2 is valid different block, triggering unsafe block backup @@ -425,10 +426,8 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe()) // pendingSafe must not be advanced as well require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0)) - // Preheat engine queue and consume A1 from batch - for i := 0; i < 4; i++ { - sequencer.ActL2PipelineStep(t) - } + // Run till we consumed A1 from batch + sequencer.ActL2EventsUntilPending(t, 1) // A1 is valid original block so pendingSafe is advanced require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1)) require.Equal(t, sequencer.L2Unsafe().Number, uint64(5)) @@ -436,8 +435,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe()) // Process B2 - sequencer.ActL2PipelineStep(t) - sequencer.ActL2PipelineStep(t) + sequencer.ActL2EventsUntilPending(t, 2) // B2 is valid different block, triggering unsafe chain reorg require.Equal(t, sequencer.L2Unsafe().Number, uint64(2)) // B2 is valid different block, triggering unsafe block backup @@ -447,14 +445,14 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { // B3 is invalid block // NextAttributes is called - sequencer.ActL2PipelineStep(t) - // forceNextSafeAttributes is called - sequencer.ActL2PipelineStep(t) + sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { + _, ok := ev.(engine2.ProcessAttributesEvent) + return ok + }, 100, true) // mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe. seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState}) - // TryBackupUnsafeReorg is called - sequencer.ActL2PipelineStep(t) + // The backup-unsafe rewind is applied // try to process invalid leftovers: B4, B5 sequencer.ActL2PipelineFull(t) @@ -565,9 +563,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { // pendingSafe must not be advanced as well require.Equal(t, sequencer.L2PendingSafe().Number, uint64(0)) // Preheat engine queue and consume A1 from batch - for i := 0; i < 4; i++ { - sequencer.ActL2PipelineStep(t) - } + sequencer.ActL2EventsUntilPending(t, 1) // A1 is valid original block so pendingSafe is advanced require.Equal(t, sequencer.L2PendingSafe().Number, uint64(1)) require.Equal(t, sequencer.L2Unsafe().Number, uint64(5)) @@ -575,8 +571,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { require.Equal(t, eth.L2BlockRef{}, sequencer.L2BackupUnsafe()) // Process B2 - sequencer.ActL2PipelineStep(t) - sequencer.ActL2PipelineStep(t) + sequencer.ActL2EventsUntilPending(t, 2) // B2 is valid different block, triggering unsafe chain reorg require.Equal(t, sequencer.L2Unsafe().Number, uint64(2)) // B2 is valid different block, triggering unsafe block backup @@ -585,17 +580,21 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { require.Equal(t, sequencer.L2PendingSafe().Number, uint64(2)) // B3 is invalid block - // NextAttributes is called - sequencer.ActL2PipelineStep(t) - // forceNextSafeAttributes is called - sequencer.ActL2PipelineStep(t) + // wait till attributes processing (excl.) before mocking errors + sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { + _, ok := ev.(engine2.ProcessAttributesEvent) + return ok + }, 100, true) serverErrCnt := 2 for i := 0; i < serverErrCnt; i++ { // mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe. seqEng.ActL2RPCFail(t, engine.GenericServerError) // TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry - sequencer.ActL2PipelineStep(t) + sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { + _, ok := ev.(rollup.EngineTemporaryErrorEvent) + return ok + }, 100, false) // backupUnsafeHead not emptied yet require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash) } @@ -980,7 +979,12 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) { verifier.ActL1HeadSignal(t) verifier.l2PipelineIdle = false for !verifier.l2PipelineIdle { - verifier.ActL2PipelineStep(t) + // wait for next pending block + verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool { + _, pending := ev.(engine2.PendingSafeUpdateEvent) + _, idle := ev.(derive.DeriverIdleEvent) + return pending || idle + }, 1000, false) if verifier.L2PendingSafe().Number < targetHeadNumber { // If the span batch is not fully processed, the safe head must not advance. require.Equal(t, verifier.L2Safe().Number, uint64(0)) @@ -1027,7 +1031,12 @@ func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) { verifier.ActL1HeadSignal(t) verifier.l2PipelineIdle = false for !verifier.l2PipelineIdle { - verifier.ActL2PipelineStep(t) + // wait for next pending block + verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool { + _, pending := ev.(engine2.PendingSafeUpdateEvent) + _, idle := ev.(derive.DeriverIdleEvent) + return pending || idle + }, 1000, false) if verifier.L2PendingSafe().Number < targetHeadNumber { // If the span batch is not fully processed, the safe head must not advance. require.Equal(t, verifier.L2Safe().Number, uint64(0)) diff --git a/op-node/rollup/attributes/attributes.go b/op-node/rollup/attributes/attributes.go index 517c91f70863..43d6000c1225 100644 --- a/op-node/rollup/attributes/attributes.go +++ b/op-node/rollup/attributes/attributes.go @@ -4,33 +4,18 @@ import ( "context" "errors" "fmt" - "io" + "sync" "time" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-node/rollup/async" - "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-service/eth" ) -type Engine interface { - engine.EngineControl - - SetUnsafeHead(eth.L2BlockRef) - SetSafeHead(eth.L2BlockRef) - SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool) - SetPendingSafeL2Head(eth.L2BlockRef) - - PendingSafeL2Head() eth.L2BlockRef - BackupUnsafeL2Head() eth.L2BlockRef -} - type L2 interface { PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error) } @@ -39,150 +24,133 @@ type AttributesHandler struct { log log.Logger cfg *rollup.Config - ec Engine + // when the rollup node shuts down, stop any in-flight sub-processes of the attributes-handler + ctx context.Context + l2 L2 + mu sync.Mutex + + emitter rollup.EventEmitter + attributes *derive.AttributesWithParent } -func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ec Engine, l2 L2) *AttributesHandler { +func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2, emitter rollup.EventEmitter) *AttributesHandler { return &AttributesHandler{ log: log, cfg: cfg, - ec: ec, + ctx: ctx, l2: l2, + emitter: emitter, attributes: nil, } } -func (eq *AttributesHandler) HasAttributes() bool { - return eq.attributes != nil +func (eq *AttributesHandler) OnEvent(ev rollup.Event) { + // Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes. + eq.mu.Lock() + defer eq.mu.Unlock() + + switch x := ev.(type) { + case engine.PendingSafeUpdateEvent: + eq.onPendingSafeUpdate(x) + case derive.DerivedAttributesEvent: + eq.attributes = x.Attributes + eq.emitter.Emit(derive.ConfirmReceivedAttributesEvent{}) + // to make sure we have a pre-state signal to process the attributes from + eq.emitter.Emit(engine.PendingSafeRequestEvent{}) + case engine.InvalidPayloadAttributesEvent: + // If the engine signals that attributes are invalid, + // that should match our last applied attributes, which we should thus drop. + eq.attributes = nil + // Time to re-evaluate without attributes. + // (the pending-safe state will then be forwarded to our source of attributes). + eq.emitter.Emit(engine.PendingSafeRequestEvent{}) + } } -func (eq *AttributesHandler) SetAttributes(attributes *derive.AttributesWithParent) { - eq.attributes = attributes -} +// onPendingSafeUpdate applies the queued-up block attributes, if any, on top of the signaled pending state. +// The event is also used to clear the queued-up attributes, when successfully processed. +// On processing failure this may emit a temporary, reset, or critical error like other derivers. +func (eq *AttributesHandler) onPendingSafeUpdate(x engine.PendingSafeUpdateEvent) { + if x.Unsafe.Number < x.PendingSafe.Number { + // invalid chain state, reset to try and fix it + eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("pending-safe label (%d) may not be ahead of unsafe head label (%d)", x.PendingSafe.Number, x.Unsafe.Number)}) + return + } -// Proceed processes block attributes, if any. -// Proceed returns io.EOF if there are no attributes to process. -// Proceed returns a temporary, reset, or critical error like other derivers. -// Proceed returns no error if the safe-head may have changed. -func (eq *AttributesHandler) Proceed(ctx context.Context) error { if eq.attributes == nil { - return io.EOF - } - // validate the safe attributes before processing them. The engine may have completed processing them through other means. - if eq.ec.PendingSafeL2Head() != eq.attributes.Parent { - // Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the - // attribute's parent then we need to cancel the attributes. - if eq.ec.PendingSafeL2Head().ParentHash == eq.attributes.Parent.Hash { - eq.log.Warn("queued safe attributes are stale, safehead progressed", - "pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(), - "attributes_parent", eq.attributes.Parent) - eq.attributes = nil - return nil - } - // If something other than a simple advance occurred, perform a full reset - return derive.NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s", - eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.attributes.Parent)) + // Request new attributes to be generated, only if we don't currently have attributes that have yet to be processed. + // It is safe to request the pipeline, the attributes-handler is the only user of it, + // and the pipeline will not generate another set of attributes until the last set is recognized. + eq.emitter.Emit(derive.PipelineStepEvent{PendingSafe: x.PendingSafe}) + return } - if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number { - if err := eq.consolidateNextSafeAttributes(ctx, eq.attributes); err != nil { - return err - } - eq.attributes = nil - return nil - } else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number { - if err := eq.forceNextSafeAttributes(ctx, eq.attributes); err != nil { - return err - } + + // Drop attributes if they don't apply on top of the pending safe head + if eq.attributes.Parent.Number != x.PendingSafe.Number { + eq.log.Warn("dropping stale attributes", + "pending", x.PendingSafe, "attributes_parent", eq.attributes.Parent) eq.attributes = nil - return nil + return + } + + if eq.attributes.Parent != x.PendingSafe { + // If the attributes are supposed to follow the pending safe head, but don't build on the exact block, + // then there's some reorg inconsistency. Either bad attributes, or bad pending safe head. + // Trigger a reset, and the system can derive attributes on top of the pending safe head. + // Until the reset is complete we don't clear the attributes state, + // so we can re-emit the ResetEvent until the reset actually happens. + + eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s", + x.PendingSafe, x.PendingSafe.ParentID(), eq.attributes.Parent)}) } else { - // For some reason the unsafe head is behind the pending safe head. Log it, and correct it. - eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head()) - eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head()) - return nil + // if there already exists a block we can just consolidate it + if x.PendingSafe.Number < x.Unsafe.Number { + eq.consolidateNextSafeAttributes(eq.attributes, x.PendingSafe) + } else { + // append to tip otherwise + eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: eq.attributes}) + } } } // consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain, // to avoid extra processing or unnecessary unwinding of the chain. -// However, if the attributes do not match, they will be forced with forceNextSafeAttributes. -func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*10) +// However, if the attributes do not match, they will be forced to process the attributes. +func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.AttributesWithParent, onto eth.L2BlockRef) { + ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10) defer cancel() - envelope, err := eq.l2.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1) + envelope, err := eq.l2.PayloadByNumber(ctx, attributes.Parent.Number+1) if err != nil { if errors.Is(err, ethereum.NotFound) { // engine may have restarted, or inconsistent safe head. We need to reset - return derive.NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err)) + eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err)}) + return } - return derive.NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err)) - } - if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil { - eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head()) - // geth cannot wind back a chain without reorging to a new, previously non-canonical, block - return eq.forceNextSafeAttributes(ctx, attributes) - } - ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload) - if err != nil { - return derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) - } - eq.ec.SetPendingSafeL2Head(ref) - if attributes.IsLastInSpan { - eq.ec.SetSafeHead(ref) + eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err)}) + return } - // unsafe head stays the same, we did not reorg the chain. - return nil -} + if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, onto.Hash, envelope, eq.log); err != nil { + eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", + "err", err, "unsafe", envelope.ExecutionPayload.ID(), "pending_safe", onto) -// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain. -func (eq *AttributesHandler) forceNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error { - attrs := attributes.Attributes - errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true) - if err == nil { - _, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{}) - } - if err != nil { - switch errType { - case engine.BlockInsertTemporaryErr: - // RPC errors are recoverable, we can retry the buffered payload attributes later. - return derive.NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err)) - case engine.BlockInsertPrestateErr: - _ = eq.ec.CancelPayload(ctx, true) - return derive.NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err)) - case engine.BlockInsertPayloadErr: - _ = eq.ec.CancelPayload(ctx, true) - eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err) - // Count the number of deposits to see if the tx list is deposit only. - depositCount := 0 - for _, tx := range attrs.Transactions { - if len(tx) > 0 && tx[0] == types.DepositTxType { - depositCount += 1 - } - } - // Deposit transaction execution errors are suppressed in the execution engine, but if the - // block is somehow invalid, there is nothing we can do to recover & we should exit. - if len(attrs.Transactions) == depositCount { - eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err) - return derive.NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err)) - } - // Revert the pending safe head to the safe head. - eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head()) - // suppress the error b/c we want to retry with the next batch from the batch queue - // If there is no valid batch the node will eventually force a deposit only block. If - // the deposit only block fails, this will return the critical error above. - - // Try to restore to previous known unsafe chain. - eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true) - - // drop the payload (by returning no error) without inserting it into the engine - return nil - default: - return derive.NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)) + // geth cannot wind back a chain without reorging to a new, previously non-canonical, block + eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: attributes}) + return + } else { + ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload) + if err != nil { + eq.log.Error("Failed to compute block-ref from execution payload") + return } + eq.emitter.Emit(engine.PromotePendingSafeEvent{ + Ref: ref, + Safe: attributes.IsLastInSpan, + }) } - return nil + + // unsafe head stays the same, we did not reorg the chain. } diff --git a/op-node/rollup/attributes/attributes_test.go b/op-node/rollup/attributes/attributes_test.go index 49e6247e1417..5a9359c1fcfd 100644 --- a/op-node/rollup/attributes/attributes_test.go +++ b/op-node/rollup/attributes/attributes_test.go @@ -2,7 +2,6 @@ package attributes import ( "context" - "io" "math/big" "math/rand" // nosemgrep "testing" @@ -14,11 +13,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/engine" - "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testutils" @@ -153,161 +150,147 @@ func TestAttributesHandler(t *testing.T) { refA1Alt, err := derive.PayloadToBlockRef(cfg, payloadA1Alt.ExecutionPayload) require.NoError(t, err) - refA2 := eth.L2BlockRef{ - Hash: testutils.RandomHash(rng), - Number: refA1.Number + 1, - ParentHash: refA1.Hash, - Time: refA1.Time + cfg.BlockTime, - L1Origin: refA.ID(), - SequenceNumber: 1, - } - - a2L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA2.SequenceNumber, aL1Info, refA2.Time) - require.NoError(t, err) - attrA2 := &derive.AttributesWithParent{ - Attributes: ð.PayloadAttributes{ - Timestamp: eth.Uint64Quantity(refA2.Time), - PrevRandao: eth.Bytes32{}, - SuggestedFeeRecipient: common.Address{}, - Withdrawals: nil, - ParentBeaconBlockRoot: &common.Hash{}, - Transactions: []eth.Data{a2L1Info}, - NoTxPool: false, - GasLimit: &gasLimit, - }, - Parent: refA1, - IsLastInSpan: true, - } + t.Run("drop invalid attributes", func(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) + + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{ + Attributes: attrA1, + }) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "queue the invalid attributes") + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(engine.InvalidPayloadAttributesEvent{ + Attributes: attrA1, + }) + emitter.AssertExpectations(t) + require.Nil(t, ah.attributes, "drop the invalid attributes") + }) t.Run("drop stale attributes", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - defer eng.AssertExpectations(t) - - ec.SetPendingSafeL2Head(refA1Alt) - ah.SetAttributes(attrA1) - require.True(t, ah.HasAttributes()) - require.NoError(t, ah.Proceed(context.Background()), "drop stale attributes") - require.False(t, ah.HasAttributes()) + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) + + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{ + Attributes: attrA1, + }) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes) + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1Alt, + Unsafe: refA1Alt, + }) + l2.AssertExpectations(t) + emitter.AssertExpectations(t) + require.Nil(t, ah.attributes, "drop stale attributes") }) t.Run("pending gets reorged", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - defer eng.AssertExpectations(t) - - ec.SetPendingSafeL2Head(refA0Alt) - ah.SetAttributes(attrA1) - require.True(t, ah.HasAttributes()) - require.ErrorIs(t, ah.Proceed(context.Background()), derive.ErrReset, "A1 does not fit on A0Alt") - require.True(t, ah.HasAttributes(), "detected reorg does not clear state, reset is required") + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) + + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{ + Attributes: attrA1, + }) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes) + + emitter.ExpectOnceType("ResetEvent") + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA0Alt, + Unsafe: refA0Alt, + }) + l2.AssertExpectations(t) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "detected reorg does not clear state, reset is required") }) t.Run("pending older than unsafe", func(t *testing.T) { t.Run("consolidation fails", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - - ec.SetUnsafeHead(refA1) - ec.SetSafeHead(refA0) - ec.SetFinalizedHead(refA0) - ec.SetPendingSafeL2Head(refA0) + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) - defer eng.AssertExpectations(t) + // attrA1Alt does not match block A1, so will cause force-reorg. + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attrA1Alt}) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "queued up derived attributes") // Call during consolidation. // The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt) - eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) - - // attrA1Alt does not match block A1, so will cause force-reorg. - { - eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ - HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg - SafeBlockHash: refA0.Hash, - FinalizedBlockHash: refA0.Hash, - }, attrA1Alt.Attributes, ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, - PayloadID: ð.PayloadID{1, 2, 3}, - }, nil) // to build the block - eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil) - eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot, - ð.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block - eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ - HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, - SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, - FinalizedBlockHash: refA0.Hash, - }, nil, ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, - PayloadID: nil, - }, nil) // to make it canonical - } - - ah.SetAttributes(attrA1Alt) - - require.True(t, ah.HasAttributes()) - require.NoError(t, ah.Proceed(context.Background()), "fail consolidation, perform force reorg") - require.False(t, ah.HasAttributes()) - - require.Equal(t, refA1Alt.Hash, payloadA1Alt.ExecutionPayload.BlockHash, "hash") - t.Log("ref A1: ", refA1.Hash) - t.Log("ref A0: ", refA0.Hash) - t.Log("ref alt: ", refA1Alt.Hash) - require.Equal(t, refA1Alt, ec.UnsafeL2Head(), "unsafe head reorg complete") - require.Equal(t, refA1Alt, ec.SafeL2Head(), "safe head reorg complete and updated") + l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) + // fail consolidation, perform force reorg + emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt}) + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA0, + Unsafe: refA1, + }) + l2.AssertExpectations(t) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed") + + // recognize reorg as complete + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1Alt, + Unsafe: refA1Alt, + }) + emitter.AssertExpectations(t) + require.Nil(t, ah.attributes, "drop when attributes are successful") }) t.Run("consolidation passes", func(t *testing.T) { fn := func(t *testing.T, lastInSpan bool) { logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - - ec.SetUnsafeHead(refA1) - ec.SetSafeHead(refA0) - ec.SetFinalizedHead(refA0) - ec.SetPendingSafeL2Head(refA0) - - defer eng.AssertExpectations(t) - - // Call during consolidation. - eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) - - expectedSafeHash := refA0.Hash - if lastInSpan { // if last in span, then it becomes safe - expectedSafeHash = refA1.Hash - } - eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ - HeadBlockHash: refA1.Hash, - SafeBlockHash: expectedSafeHash, - FinalizedBlockHash: refA0.Hash, - }, nil, ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, - PayloadID: nil, - }, nil) + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) attr := &derive.AttributesWithParent{ Attributes: attrA1.Attributes, // attributes will match, passing consolidation Parent: attrA1.Parent, IsLastInSpan: lastInSpan, } - ah.SetAttributes(attr) - - require.True(t, ah.HasAttributes()) - require.NoError(t, ah.Proceed(context.Background()), "consolidate") - require.False(t, ah.HasAttributes()) - require.NoError(t, ec.TryUpdateEngine(context.Background()), "update to handle safe bump (lastinspan case)") - if lastInSpan { - require.Equal(t, refA1, ec.SafeL2Head(), "last in span becomes safe instantaneously") - } else { - require.Equal(t, refA1, ec.PendingSafeL2Head(), "pending as safe") - require.Equal(t, refA0, ec.SafeL2Head(), "A1 not yet safe") - } + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attr}) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "queued up derived attributes") + + // Call during consolidation. + l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) + + emitter.ExpectOnce(engine.PromotePendingSafeEvent{ + Ref: refA1, + Safe: lastInSpan, // last in span becomes safe instantaneously + }) + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA0, + Unsafe: refA1, + }) + l2.AssertExpectations(t) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed") + + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1, + Unsafe: refA1, + }) + emitter.AssertExpectations(t) + require.Nil(t, ah.attributes, "drop when attributes are successful") } t.Run("is last span", func(t *testing.T) { fn(t, true) @@ -321,89 +304,70 @@ func TestAttributesHandler(t *testing.T) { t.Run("pending equals unsafe", func(t *testing.T) { // no consolidation to do, just force next attributes on tip of chain - logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) - ec.SetUnsafeHead(refA0) - ec.SetSafeHead(refA0) - ec.SetFinalizedHead(refA0) - ec.SetPendingSafeL2Head(refA0) - - defer eng.AssertExpectations(t) + emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) + emitter.ExpectOnce(engine.PendingSafeRequestEvent{}) + ah.OnEvent(derive.DerivedAttributesEvent{Attributes: attrA1Alt}) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes, "queued up derived attributes") // sanity check test setup require.True(t, attrA1Alt.IsLastInSpan, "must be last in span for attributes to become safe") - // process attrA1Alt on top - { - eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ - HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg - SafeBlockHash: refA0.Hash, - FinalizedBlockHash: refA0.Hash, - }, attrA1Alt.Attributes, ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, - PayloadID: ð.PayloadID{1, 2, 3}, - }, nil) // to build the block - eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil) - eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot, - ð.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block - eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ - HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, - SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, // it becomes safe - FinalizedBlockHash: refA0.Hash, - }, nil, ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, - PayloadID: nil, - }, nil) // to make it canonical - } - - ah.SetAttributes(attrA1Alt) - - require.True(t, ah.HasAttributes()) - require.NoError(t, ah.Proceed(context.Background()), "insert new block") - require.False(t, ah.HasAttributes()) - - require.Equal(t, refA1Alt, ec.SafeL2Head(), "processing complete") + // attrA1Alt will fit right on top of A0 + emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt}) + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA0, + Unsafe: refA0, + }) + l2.AssertExpectations(t) + emitter.AssertExpectations(t) + require.NotNil(t, ah.attributes) + + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1Alt, + Unsafe: refA1Alt, + }) + emitter.AssertExpectations(t) + require.Nil(t, ah.attributes, "clear attributes after successful processing") }) t.Run("pending ahead of unsafe", func(t *testing.T) { // Legacy test case: if attributes fit on top of the pending safe block as expected, - // but if the unsafe block is older, then we can recover by updating the unsafe head. - + // but if the unsafe block is older, then we can recover by resetting. logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - - ec.SetUnsafeHead(refA0) - ec.SetSafeHead(refA0) - ec.SetFinalizedHead(refA0) - ec.SetPendingSafeL2Head(refA1) - - defer eng.AssertExpectations(t) - - ah.SetAttributes(attrA2) - - require.True(t, ah.HasAttributes()) - require.NoError(t, ah.Proceed(context.Background()), "detect unsafe - pending safe inconsistency") - require.True(t, ah.HasAttributes(), "still need the attributes, after unsafe head is corrected") - - require.Equal(t, refA0, ec.SafeL2Head(), "still same safe head") - require.Equal(t, refA1, ec.PendingSafeL2Head(), "still same pending safe head") - require.Equal(t, refA1, ec.UnsafeL2Head(), "updated unsafe head") + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) + + emitter.ExpectOnceType("ResetEvent") + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1, + Unsafe: refA0, + }) + emitter.AssertExpectations(t) + l2.AssertExpectations(t) }) t.Run("no attributes", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) - eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) - ah := NewAttributesHandler(logger, cfg, ec, eng) - defer eng.AssertExpectations(t) - - require.Equal(t, ah.Proceed(context.Background()), io.EOF, "no attributes to process") + l2 := &testutils.MockL2Client{} + emitter := &testutils.MockEmitter{} + ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter) + + // If there are no attributes, we expect the pipeline to be requested to generate attributes. + emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1}) + ah.OnEvent(engine.PendingSafeUpdateEvent{ + PendingSafe: refA1, + Unsafe: refA1, + }) + // no calls to L2 or emitter when there is nothing to process + l2.AssertExpectations(t) + emitter.AssertExpectations(t) }) } diff --git a/op-node/rollup/derive/deriver.go b/op-node/rollup/derive/deriver.go index 9d65599a5382..69ef4023fbf2 100644 --- a/op-node/rollup/derive/deriver.go +++ b/op-node/rollup/derive/deriver.go @@ -21,6 +21,14 @@ func (d DeriverMoreEvent) String() string { return "deriver-more" } +// ConfirmReceivedAttributesEvent signals that the derivation pipeline may generate new attributes. +// After emitting DerivedAttributesEvent, no new attributes will be generated until a confirmation of reception. +type ConfirmReceivedAttributesEvent struct{} + +func (d ConfirmReceivedAttributesEvent) String() string { + return "confirm-received-attributes" +} + type ConfirmPipelineResetEvent struct{} func (d ConfirmPipelineResetEvent) String() string { @@ -50,6 +58,8 @@ type PipelineDeriver struct { ctx context.Context emitter rollup.EventEmitter + + needAttributesConfirmation bool } func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter rollup.EventEmitter) *PipelineDeriver { @@ -65,6 +75,11 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) { case rollup.ResetEvent: d.pipeline.Reset() case PipelineStepEvent: + // Don't generate attributes if there are already attributes in-flight + if d.needAttributesConfirmation { + d.pipeline.log.Debug("Previously sent attributes are unconfirmed to be received") + return + } d.pipeline.log.Trace("Derivation pipeline step", "onto_origin", d.pipeline.Origin()) attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe) if err == io.EOF { @@ -87,6 +102,7 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) { d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err}) } else { if attrib != nil { + d.needAttributesConfirmation = true d.emitter.Emit(DerivedAttributesEvent{Attributes: attrib}) } else { d.emitter.Emit(DeriverMoreEvent{}) // continue with the next step if we can @@ -94,5 +110,7 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) { } case ConfirmPipelineResetEvent: d.pipeline.ConfirmEngineReset() + case ConfirmReceivedAttributesEvent: + d.needAttributesConfirmation = false } } diff --git a/op-node/rollup/derive/plasma_data_source.go b/op-node/rollup/derive/plasma_data_source.go index e0b94d412b29..19beb145999b 100644 --- a/op-node/rollup/derive/plasma_data_source.go +++ b/op-node/rollup/derive/plasma_data_source.go @@ -83,13 +83,13 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { // skip the input return s.Next(ctx) } else if errors.Is(err, plasma.ErrMissingPastWindow) { - return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err)) + return nil, NewCriticalError(fmt.Errorf("data for comm %s not available: %w", s.comm, err)) } else if errors.Is(err, plasma.ErrPendingChallenge) { // continue stepping without slowing down. return nil, NotEnoughData } else if err != nil { // return temporary error so we can keep retrying. - return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err)) + return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", s.comm, err)) } // inputs are limited to a max size to ensure they can be challenged in the DA contract. if s.comm.CommitmentType() == plasma.Keccak256CommitmentType && len(data) > plasma.MaxInputSize { diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index d7d0fbc7a53d..936ce0ca5833 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -191,7 +191,7 @@ func NewDriver( finalizer = finality.NewFinalizer(log, cfg, l1, ec) } - attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, l2) + attributesHandler := attributes.NewAttributesHandler(log, cfg, driverCtx, l2, synchronousEvents) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics) pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline, synchronousEvents) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) @@ -200,20 +200,19 @@ func NewDriver( asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics) syncDeriver := &SyncDeriver{ - Derivation: derivationPipeline, - Finalizer: finalizer, - AttributesHandler: attributesHandler, - SafeHeadNotifs: safeHeadListener, - CLSync: clSync, - Engine: ec, - SyncCfg: syncCfg, - Config: cfg, - L1: l1, - L2: l2, - Emitter: synchronousEvents, - Log: log, - Ctx: driverCtx, - Drain: synchronousEvents.Drain, + Derivation: derivationPipeline, + Finalizer: finalizer, + SafeHeadNotifs: safeHeadListener, + CLSync: clSync, + Engine: ec, + SyncCfg: syncCfg, + Config: cfg, + L1: l1, + L2: l2, + Emitter: synchronousEvents, + Log: log, + Ctx: driverCtx, + Drain: synchronousEvents.Drain, } engDeriv := engine.NewEngDeriver(log, driverCtx, cfg, ec, synchronousEvents) schedDeriv := NewStepSchedulingDeriver(log, synchronousEvents) @@ -254,6 +253,7 @@ func NewDriver( driver, clSync, pipelineDeriver, + attributesHandler, } return driver diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 0195c46d5c61..2d85708b23ff 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "io" gosync "sync" "time" @@ -397,8 +396,6 @@ type SyncDeriver struct { Finalizer Finalizer - AttributesHandler AttributesHandler - SafeHeadNotifs rollup.SafeHeadListener // notified when safe head is updated lastNotifiedSafeHead eth.L2BlockRef @@ -433,6 +430,10 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) { s.onResetEvent(x) case rollup.EngineTemporaryErrorEvent: s.Log.Warn("Derivation process temporary error", "err", x.Err) + + // Make sure that for any temporarily failed attributes we retry processing. + s.Emitter.Emit(engine.PendingSafeRequestEvent{}) + s.Emitter.Emit(StepReqEvent{}) case engine.EngineResetConfirmedEvent: s.onEngineConfirmedReset(x) @@ -444,8 +445,6 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) { // If there is more data to process, // continue derivation quickly s.Emitter.Emit(StepReqEvent{ResetBackoff: true}) - case derive.DerivedAttributesEvent: - s.AttributesHandler.SetAttributes(x.Attributes) } } @@ -534,11 +533,6 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error { // Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up. - // Try safe attributes now. - if err := s.AttributesHandler.Proceed(ctx); err != io.EOF { - // EOF error means we can't process the next attributes. Then we should derive the next attributes. - return err - } derivationOrigin := s.Derivation.Origin() if s.SafeHeadNotifs != nil && s.SafeHeadNotifs.Enabled() && s.Derivation.DerivationReady() && s.lastNotifiedSafeHead != s.Engine.SafeL2Head() { @@ -558,7 +552,13 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error { return fmt.Errorf("finalizer OnDerivationL1End error: %w", err) } - s.Emitter.Emit(derive.PipelineStepEvent{PendingSafe: s.Engine.PendingSafeL2Head()}) + // Since we don't force attributes to be processed at this point, + // we cannot safely directly trigger the derivation, as that may generate new attributes that + // conflict with what attributes have not been applied yet. + // Instead, we request the engine to repeat where its pending-safe head is at. + // Upon the pending-safe signal the attributes deriver can then ask the pipeline + // to generate new attributes, if no attributes are known already. + s.Emitter.Emit(engine.PendingSafeRequestEvent{}) return nil } diff --git a/op-node/rollup/synchronous.go b/op-node/rollup/synchronous.go index 2fd85a417863..68943381e24e 100644 --- a/op-node/rollup/synchronous.go +++ b/op-node/rollup/synchronous.go @@ -2,6 +2,7 @@ package rollup import ( "context" + "io" "sync" "github.com/ethereum/go-ethereum/log" @@ -73,4 +74,30 @@ func (s *SynchronousEvents) Drain() error { } } +func (s *SynchronousEvents) DrainUntil(fn func(ev Event) bool, excl bool) error { + for { + if s.ctx.Err() != nil { + return s.ctx.Err() + } + if len(s.events) == 0 { + return io.EOF + } + + s.evLock.Lock() + first := s.events[0] + stop := fn(first) + if excl && stop { + s.evLock.Unlock() + return nil + } + s.events = s.events[1:] + s.evLock.Unlock() + + s.root.OnEvent(first) + if stop { + return nil + } + } +} + var _ EventEmitter = (*SynchronousEvents)(nil) diff --git a/op-program/client/driver/program.go b/op-program/client/driver/program.go index dccb4c0a3753..84d44298608d 100644 --- a/op-program/client/driver/program.go +++ b/op-program/client/driver/program.go @@ -44,6 +44,11 @@ func (d *ProgramDeriver) OnEvent(ev rollup.Event) { case derive.DeriverMoreEvent: d.Emitter.Emit(engine.PendingSafeRequestEvent{}) case derive.DerivedAttributesEvent: + // Allow new attributes to be generated. + // We will process the current attributes synchronously, + // triggering a single PendingSafeUpdateEvent or InvalidPayloadAttributesEvent, + // to continue derivation from. + d.Emitter.Emit(derive.ConfirmReceivedAttributesEvent{}) // No need to queue the attributes, since there is no unsafe chain to consolidate against, // and no temporary-error retry to perform on block processing. d.Emitter.Emit(engine.ProcessAttributesEvent{Attributes: x.Attributes}) diff --git a/op-program/client/driver/program_test.go b/op-program/client/driver/program_test.go index d231193d386d..186832286f62 100644 --- a/op-program/client/driver/program_test.go +++ b/op-program/client/driver/program_test.go @@ -63,6 +63,7 @@ func TestProgramDeriver(t *testing.T) { t.Run("derived attributes", func(t *testing.T) { p, m := newProgram(t, 1000) attrib := &derive.AttributesWithParent{Parent: eth.L2BlockRef{Number: 123}} + m.ExpectOnce(derive.ConfirmReceivedAttributesEvent{}) m.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrib}) p.OnEvent(derive.DerivedAttributesEvent{Attributes: attrib}) m.AssertExpectations(t) diff --git a/op-service/testutils/mock_emitter.go b/op-service/testutils/mock_emitter.go index e808c651053e..ae329d2064e1 100644 --- a/op-service/testutils/mock_emitter.go +++ b/op-service/testutils/mock_emitter.go @@ -18,6 +18,10 @@ func (m *MockEmitter) ExpectOnce(expected rollup.Event) { m.Mock.On("Emit", expected).Once() } +func (m *MockEmitter) ExpectOnceType(typ string) { + m.Mock.On("Emit", mock.AnythingOfType(typ)).Once() +} + func (m *MockEmitter) AssertExpectations(t mock.TestingT) { m.Mock.AssertExpectations(t) }