From 9059f6d96bbe4f23afb34c7933fad00671f21b1a Mon Sep 17 00:00:00 2001 From: protolambda Date: Fri, 24 May 2024 12:21:06 +0200 Subject: [PATCH] op-node: separate attributes processing from engine queue --- op-e2e/actions/l2_verifier.go | 6 +- op-node/rollup/attributes/attributes.go | 187 ++++++++ op-node/rollup/attributes/attributes_test.go | 408 ++++++++++++++++++ .../engine_consolidate.go | 10 +- .../engine_consolidate_test.go | 47 +- op-node/rollup/clsync/clsync.go | 7 - op-node/rollup/derive/attributes_queue.go | 6 + op-node/rollup/derive/engine_controller.go | 56 ++- op-node/rollup/derive/engine_queue.go | 276 ++---------- op-node/rollup/derive/engine_queue_test.go | 116 ++--- op-node/rollup/derive/pipeline.go | 5 +- op-node/rollup/driver/driver.go | 4 +- op-node/rollup/driver/sequencer.go | 2 +- op-node/rollup/driver/sequencer_test.go | 2 +- op-node/rollup/driver/state.go | 13 - op-program/client/driver/driver.go | 7 +- op-service/testutils/mock_engine.go | 17 +- 17 files changed, 807 insertions(+), 362 deletions(-) create mode 100644 op-node/rollup/attributes/attributes.go create mode 100644 op-node/rollup/attributes/attributes_test.go rename op-node/rollup/{derive => attributes}/engine_consolidate.go (94%) rename op-node/rollup/{derive => attributes}/engine_consolidate_test.go (92%) diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index bbdb5f0bf3ac..3ad098f9599c 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/attributes" "github.com/ethereum-optimism/optimism/op-node/rollup/clsync" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" @@ -81,7 +82,10 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri finalizer = finality.NewFinalizer(log, cfg, l1, engine) } - pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener, finalizer) + attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, eng) + + pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, + syncCfg, safeHeadListener, finalizer, attributesHandler) pipeline.Reset() rollupNode := &L2Verifier{ diff --git a/op-node/rollup/attributes/attributes.go b/op-node/rollup/attributes/attributes.go new file mode 100644 index 000000000000..52f04170c4d9 --- /dev/null +++ b/op-node/rollup/attributes/attributes.go @@ -0,0 +1,187 @@ +package attributes + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/async" + "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +type Engine interface { + derive.EngineControl + + SetUnsafeHead(eth.L2BlockRef) + SetSafeHead(eth.L2BlockRef) + SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool) + SetPendingSafeL2Head(eth.L2BlockRef) + + PendingSafeL2Head() eth.L2BlockRef + BackupUnsafeL2Head() eth.L2BlockRef +} + +type L2 interface { + PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error) +} + +type AttributesHandler struct { + log log.Logger + cfg *rollup.Config + + ec Engine + l2 L2 + + attributes *derive.AttributesWithParent +} + +func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ec Engine, l2 L2) *AttributesHandler { + return &AttributesHandler{ + log: log, + cfg: cfg, + ec: ec, + l2: l2, + attributes: nil, + } +} + +func (eq *AttributesHandler) HasAttributes() bool { + return eq.attributes != nil +} + +func (eq *AttributesHandler) SetAttributes(attributes *derive.AttributesWithParent) { + eq.attributes = attributes +} + +// Proceed processes block attributes, if any. +// Proceed returns io.EOF if there are no attributes to process. +// Proceed returns a temporary, reset, or critical error like other derivers. +// Proceed returns no error if the safe-head may have changed. +func (eq *AttributesHandler) Proceed(ctx context.Context) error { + if eq.attributes == nil { + return io.EOF + } + // validate the safe attributes before processing them. The engine may have completed processing them through other means. + if eq.ec.PendingSafeL2Head() != eq.attributes.Parent { + // Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the + // attribute's parent then we need to cancel the attributes. + if eq.ec.PendingSafeL2Head().ParentHash == eq.attributes.Parent.Hash { + eq.log.Warn("queued safe attributes are stale, safehead progressed", + "pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(), + "attributes_parent", eq.attributes.Parent) + eq.attributes = nil + return nil + } + // If something other than a simple advance occurred, perform a full reset + return derive.NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s", + eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.attributes.Parent)) + } + if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number { + if err := eq.consolidateNextSafeAttributes(ctx, eq.attributes); err != nil { + return err + } + eq.attributes = nil + return nil + } else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number { + if err := eq.forceNextSafeAttributes(ctx, eq.attributes); err != nil { + return err + } + eq.attributes = nil + return nil + } else { + // For some reason the unsafe head is behind the pending safe head. Log it, and correct it. + eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head()) + eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head()) + return nil + } +} + +// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain, +// to avoid extra processing or unnecessary unwinding of the chain. +// However, if the attributes do not match, they will be forced with forceNextSafeAttributes. +func (eq *AttributesHandler) consolidateNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + envelope, err := eq.l2.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + // engine may have restarted, or inconsistent safe head. We need to reset + return derive.NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err)) + } + return derive.NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err)) + } + if err := AttributesMatchBlock(eq.cfg, attributes.Attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil { + eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head()) + // geth cannot wind back a chain without reorging to a new, previously non-canonical, block + return eq.forceNextSafeAttributes(ctx, attributes) + } + ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload) + if err != nil { + return derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) + } + eq.ec.SetPendingSafeL2Head(ref) + if attributes.IsLastInSpan { + eq.ec.SetSafeHead(ref) + } + // unsafe head stays the same, we did not reorg the chain. + return nil +} + +// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain. +func (eq *AttributesHandler) forceNextSafeAttributes(ctx context.Context, attributes *derive.AttributesWithParent) error { + attrs := attributes.Attributes + errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true) + if err == nil { + _, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{}) + } + if err != nil { + switch errType { + case derive.BlockInsertTemporaryErr: + // RPC errors are recoverable, we can retry the buffered payload attributes later. + return derive.NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err)) + case derive.BlockInsertPrestateErr: + _ = eq.ec.CancelPayload(ctx, true) + return derive.NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err)) + case derive.BlockInsertPayloadErr: + _ = eq.ec.CancelPayload(ctx, true) + eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err) + // Count the number of deposits to see if the tx list is deposit only. + depositCount := 0 + for _, tx := range attrs.Transactions { + if len(tx) > 0 && tx[0] == types.DepositTxType { + depositCount += 1 + } + } + // Deposit transaction execution errors are suppressed in the execution engine, but if the + // block is somehow invalid, there is nothing we can do to recover & we should exit. + if len(attrs.Transactions) == depositCount { + eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err) + return derive.NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err)) + } + // Revert the pending safe head to the safe head. + eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head()) + // suppress the error b/c we want to retry with the next batch from the batch queue + // If there is no valid batch the node will eventually force a deposit only block. If + // the deposit only block fails, this will return the critical error above. + + // Try to restore to previous known unsafe chain. + eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true) + + // drop the payload (by returning no error) without inserting it into the engine + return nil + default: + return derive.NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)) + } + } + return nil +} diff --git a/op-node/rollup/attributes/attributes_test.go b/op-node/rollup/attributes/attributes_test.go new file mode 100644 index 000000000000..2b27dcf39697 --- /dev/null +++ b/op-node/rollup/attributes/attributes_test.go @@ -0,0 +1,408 @@ +package attributes + +import ( + "context" + "io" + "math/big" + "math/rand" // nosemgrep + "testing" + + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-node/metrics" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" +) + +func TestAttributesHandler(t *testing.T) { + rng := rand.New(rand.NewSource(1234)) + refA := testutils.RandomBlockRef(rng) + + aL1Info := &testutils.MockBlockInfo{ + InfoParentHash: refA.ParentHash, + InfoNum: refA.Number, + InfoTime: refA.Time, + InfoHash: refA.Hash, + InfoBaseFee: big.NewInt(1), + InfoBlobBaseFee: big.NewInt(1), + InfoReceiptRoot: types.EmptyRootHash, + InfoRoot: testutils.RandomHash(rng), + InfoGasUsed: rng.Uint64(), + } + + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + refA0Alt := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + + gasLimit := eth.Uint64Quantity(20_000_000) + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + SystemConfig: eth.SystemConfig{ + BatcherAddr: common.Address{42}, + Overhead: [32]byte{31: 123}, + Scalar: [32]byte{0: 0, 31: 42}, + GasLimit: 20_000_000, + }, + }, + BlockTime: 1, + SeqWindowSize: 2, + RegolithTime: new(uint64), + CanyonTime: new(uint64), + EcotoneTime: new(uint64), + } + + a1L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, 1, aL1Info, refA0.Time+cfg.BlockTime) + require.NoError(t, err) + parentBeaconBlockRoot := testutils.RandomHash(rng) + payloadA1 := ð.ExecutionPayloadEnvelope{ExecutionPayload: ð.ExecutionPayload{ + ParentHash: refA0.Hash, + FeeRecipient: common.Address{}, + StateRoot: eth.Bytes32{}, + ReceiptsRoot: eth.Bytes32{}, + LogsBloom: eth.Bytes256{}, + PrevRandao: eth.Bytes32{}, + BlockNumber: eth.Uint64Quantity(refA0.Number + 1), + GasLimit: gasLimit, + GasUsed: 0, + Timestamp: eth.Uint64Quantity(refA0.Time + cfg.BlockTime), + ExtraData: nil, + BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)), + BlockHash: common.Hash{}, + Transactions: []eth.Data{a1L1Info}, + }, ParentBeaconBlockRoot: &parentBeaconBlockRoot} + // fix up the block-hash + payloadA1.ExecutionPayload.BlockHash, _ = payloadA1.CheckBlockHash() + + attrA1 := &derive.AttributesWithParent{ + Attributes: ð.PayloadAttributes{ + Timestamp: payloadA1.ExecutionPayload.Timestamp, + PrevRandao: payloadA1.ExecutionPayload.PrevRandao, + SuggestedFeeRecipient: payloadA1.ExecutionPayload.FeeRecipient, + Withdrawals: payloadA1.ExecutionPayload.Withdrawals, + ParentBeaconBlockRoot: payloadA1.ParentBeaconBlockRoot, + Transactions: []eth.Data{a1L1Info}, + NoTxPool: false, + GasLimit: &payloadA1.ExecutionPayload.GasLimit, + }, + Parent: refA0, + IsLastInSpan: true, + } + refA1, err := derive.PayloadToBlockRef(cfg, payloadA1.ExecutionPayload) + require.NoError(t, err) + + payloadA1Alt := ð.ExecutionPayloadEnvelope{ExecutionPayload: ð.ExecutionPayload{ + ParentHash: refA0.Hash, + FeeRecipient: common.Address{0xde, 0xea}, // change of the alternative payload + StateRoot: eth.Bytes32{}, + ReceiptsRoot: eth.Bytes32{}, + LogsBloom: eth.Bytes256{}, + PrevRandao: eth.Bytes32{}, + BlockNumber: eth.Uint64Quantity(refA0.Number + 1), + GasLimit: gasLimit, + GasUsed: 0, + Timestamp: eth.Uint64Quantity(refA0.Time + cfg.BlockTime), + ExtraData: nil, + BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)), + BlockHash: common.Hash{}, + Transactions: []eth.Data{a1L1Info}, + }, ParentBeaconBlockRoot: &parentBeaconBlockRoot} + // fix up the block-hash + payloadA1Alt.ExecutionPayload.BlockHash, _ = payloadA1Alt.CheckBlockHash() + + attrA1Alt := &derive.AttributesWithParent{ + Attributes: ð.PayloadAttributes{ + Timestamp: payloadA1Alt.ExecutionPayload.Timestamp, + PrevRandao: payloadA1Alt.ExecutionPayload.PrevRandao, + SuggestedFeeRecipient: payloadA1Alt.ExecutionPayload.FeeRecipient, + Withdrawals: payloadA1Alt.ExecutionPayload.Withdrawals, + ParentBeaconBlockRoot: payloadA1Alt.ParentBeaconBlockRoot, + Transactions: []eth.Data{a1L1Info}, + NoTxPool: false, + GasLimit: &payloadA1Alt.ExecutionPayload.GasLimit, + }, + Parent: refA0, + IsLastInSpan: true, + } + + refA1Alt, err := derive.PayloadToBlockRef(cfg, payloadA1Alt.ExecutionPayload) + require.NoError(t, err) + + refA2 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: refA1.Number + 1, + ParentHash: refA1.Hash, + Time: refA1.Time + cfg.BlockTime, + L1Origin: refA.ID(), + SequenceNumber: 1, + } + + a2L1Info, err := derive.L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA2.SequenceNumber, aL1Info, refA2.Time) + require.NoError(t, err) + attrA2 := &derive.AttributesWithParent{ + Attributes: ð.PayloadAttributes{ + Timestamp: eth.Uint64Quantity(refA2.Time), + PrevRandao: eth.Bytes32{}, + SuggestedFeeRecipient: common.Address{}, + Withdrawals: nil, + ParentBeaconBlockRoot: &common.Hash{}, + Transactions: []eth.Data{a2L1Info}, + NoTxPool: false, + GasLimit: &gasLimit, + }, + Parent: refA1, + IsLastInSpan: true, + } + + t.Run("drop stale attributes", func(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + defer eng.AssertExpectations(t) + + ec.SetPendingSafeL2Head(refA1Alt) + ah.SetAttributes(attrA1) + require.True(t, ah.HasAttributes()) + require.NoError(t, ah.Proceed(context.Background()), "drop stale attributes") + require.False(t, ah.HasAttributes()) + }) + + t.Run("pending gets reorged", func(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + defer eng.AssertExpectations(t) + + ec.SetPendingSafeL2Head(refA0Alt) + ah.SetAttributes(attrA1) + require.True(t, ah.HasAttributes()) + require.ErrorIs(t, ah.Proceed(context.Background()), derive.ErrReset, "A1 does not fit on A0Alt") + require.True(t, ah.HasAttributes(), "detected reorg does not clear state, reset is required") + }) + + t.Run("pending older than unsafe", func(t *testing.T) { + t.Run("consolidation fails", func(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + + ec.SetUnsafeHead(refA1) + ec.SetSafeHead(refA0) + ec.SetFinalizedHead(refA0) + ec.SetPendingSafeL2Head(refA0) + + defer eng.AssertExpectations(t) + + // Call during consolidation. + // The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt) + eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) + + // attrA1Alt does not match block A1, so will cause force-reorg. + { + eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ + HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg + SafeBlockHash: refA0.Hash, + FinalizedBlockHash: refA0.Hash, + }, attrA1Alt.Attributes, ð.ForkchoiceUpdatedResult{ + PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, + PayloadID: ð.PayloadID{1, 2, 3}, + }, nil) // to build the block + eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil) + eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot, + ð.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block + eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ + HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, + SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, + FinalizedBlockHash: refA0.Hash, + }, nil, ð.ForkchoiceUpdatedResult{ + PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, + PayloadID: nil, + }, nil) // to make it canonical + } + + ah.SetAttributes(attrA1Alt) + + require.True(t, ah.HasAttributes()) + require.NoError(t, ah.Proceed(context.Background()), "fail consolidation, perform force reorg") + require.False(t, ah.HasAttributes()) + + require.Equal(t, refA1Alt.Hash, payloadA1Alt.ExecutionPayload.BlockHash, "hash") + t.Log("ref A1: ", refA1.Hash) + t.Log("ref A0: ", refA0.Hash) + t.Log("ref alt: ", refA1Alt.Hash) + require.Equal(t, refA1Alt, ec.UnsafeL2Head(), "unsafe head reorg complete") + require.Equal(t, refA1Alt, ec.SafeL2Head(), "safe head reorg complete and updated") + }) + t.Run("consolidation passes", func(t *testing.T) { + fn := func(t *testing.T, lastInSpan bool) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + + ec.SetUnsafeHead(refA1) + ec.SetSafeHead(refA0) + ec.SetFinalizedHead(refA0) + ec.SetPendingSafeL2Head(refA0) + + defer eng.AssertExpectations(t) + + // Call during consolidation. + eng.ExpectPayloadByNumber(refA1.Number, payloadA1, nil) + + expectedSafeHash := refA0.Hash + if lastInSpan { // if last in span, then it becomes safe + expectedSafeHash = refA1.Hash + } + eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ + HeadBlockHash: refA1.Hash, + SafeBlockHash: expectedSafeHash, + FinalizedBlockHash: refA0.Hash, + }, nil, ð.ForkchoiceUpdatedResult{ + PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, + PayloadID: nil, + }, nil) + + attr := &derive.AttributesWithParent{ + Attributes: attrA1.Attributes, // attributes will match, passing consolidation + Parent: attrA1.Parent, + IsLastInSpan: lastInSpan, + } + ah.SetAttributes(attr) + + require.True(t, ah.HasAttributes()) + require.NoError(t, ah.Proceed(context.Background()), "consolidate") + require.False(t, ah.HasAttributes()) + require.NoError(t, ec.TryUpdateEngine(context.Background()), "update to handle safe bump (lastinspan case)") + if lastInSpan { + require.Equal(t, refA1, ec.SafeL2Head(), "last in span becomes safe instantaneously") + } else { + require.Equal(t, refA1, ec.PendingSafeL2Head(), "pending as safe") + require.Equal(t, refA0, ec.SafeL2Head(), "A1 not yet safe") + } + } + t.Run("is last span", func(t *testing.T) { + fn(t, true) + }) + + t.Run("is not last span", func(t *testing.T) { + fn(t, false) + }) + }) + }) + + t.Run("pending equals unsafe", func(t *testing.T) { + // no consolidation to do, just force next attributes on tip of chain + + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + + ec.SetUnsafeHead(refA0) + ec.SetSafeHead(refA0) + ec.SetFinalizedHead(refA0) + ec.SetPendingSafeL2Head(refA0) + + defer eng.AssertExpectations(t) + + // sanity check test setup + require.True(t, attrA1Alt.IsLastInSpan, "must be last in span for attributes to become safe") + + // process attrA1Alt on top + { + eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ + HeadBlockHash: payloadA1Alt.ExecutionPayload.ParentHash, // reorg + SafeBlockHash: refA0.Hash, + FinalizedBlockHash: refA0.Hash, + }, attrA1Alt.Attributes, ð.ForkchoiceUpdatedResult{ + PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, + PayloadID: ð.PayloadID{1, 2, 3}, + }, nil) // to build the block + eng.ExpectGetPayload(eth.PayloadID{1, 2, 3}, payloadA1Alt, nil) + eng.ExpectNewPayload(payloadA1Alt.ExecutionPayload, payloadA1Alt.ParentBeaconBlockRoot, + ð.PayloadStatusV1{Status: eth.ExecutionValid}, nil) // to persist the block + eng.ExpectForkchoiceUpdate(ð.ForkchoiceState{ + HeadBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, + SafeBlockHash: payloadA1Alt.ExecutionPayload.BlockHash, // it becomes safe + FinalizedBlockHash: refA0.Hash, + }, nil, ð.ForkchoiceUpdatedResult{ + PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}, + PayloadID: nil, + }, nil) // to make it canonical + } + + ah.SetAttributes(attrA1Alt) + + require.True(t, ah.HasAttributes()) + require.NoError(t, ah.Proceed(context.Background()), "insert new block") + require.False(t, ah.HasAttributes()) + + require.Equal(t, refA1Alt, ec.SafeL2Head(), "processing complete") + }) + + t.Run("pending ahead of unsafe", func(t *testing.T) { + // Legacy test case: if attributes fit on top of the pending safe block as expected, + // but if the unsafe block is older, then we can recover by updating the unsafe head. + + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + + ec.SetUnsafeHead(refA0) + ec.SetSafeHead(refA0) + ec.SetFinalizedHead(refA0) + ec.SetPendingSafeL2Head(refA1) + + defer eng.AssertExpectations(t) + + ah.SetAttributes(attrA2) + + require.True(t, ah.HasAttributes()) + require.NoError(t, ah.Proceed(context.Background()), "detect unsafe - pending safe inconsistency") + require.True(t, ah.HasAttributes(), "still need the attributes, after unsafe head is corrected") + + require.Equal(t, refA0, ec.SafeL2Head(), "still same safe head") + require.Equal(t, refA1, ec.PendingSafeL2Head(), "still same pending safe head") + require.Equal(t, refA1, ec.UnsafeL2Head(), "updated unsafe head") + }) + + t.Run("no attributes", func(t *testing.T) { + logger := testlog.Logger(t, log.LevelInfo) + eng := &testutils.MockEngine{} + ec := derive.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ah := NewAttributesHandler(logger, cfg, ec, eng) + defer eng.AssertExpectations(t) + + require.Equal(t, ah.Proceed(context.Background()), io.EOF, "no attributes to process") + }) + +} diff --git a/op-node/rollup/derive/engine_consolidate.go b/op-node/rollup/attributes/engine_consolidate.go similarity index 94% rename from op-node/rollup/derive/engine_consolidate.go rename to op-node/rollup/attributes/engine_consolidate.go index 121a7403257a..3cd9e8931210 100644 --- a/op-node/rollup/derive/engine_consolidate.go +++ b/op-node/rollup/attributes/engine_consolidate.go @@ -1,4 +1,4 @@ -package derive +package attributes import ( "bytes" @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -60,6 +61,9 @@ func AttributesMatchBlock(rollupCfg *rollup.Config, attrs *eth.PayloadAttributes if err := checkParentBeaconBlockRootMatch(attrs.ParentBeaconBlockRoot, envelope.ParentBeaconBlockRoot); err != nil { return err } + if attrs.SuggestedFeeRecipient != block.FeeRecipient { + return fmt.Errorf("fee recipient data does not match, expected %s but got %s", block.FeeRecipient, attrs.SuggestedFeeRecipient) + } return nil } @@ -119,8 +123,8 @@ func logL1InfoTxns(rollupCfg *rollup.Config, l log.Logger, l2Number, l2Timestamp } // Then decode the ABI encoded parameters - safeInfo, errSafe := L1BlockInfoFromBytes(rollupCfg, l2Timestamp, safeTxValue.Data()) - unsafeInfo, errUnsafe := L1BlockInfoFromBytes(rollupCfg, l2Timestamp, unsafeTxValue.Data()) + safeInfo, errSafe := derive.L1BlockInfoFromBytes(rollupCfg, l2Timestamp, safeTxValue.Data()) + unsafeInfo, errUnsafe := derive.L1BlockInfoFromBytes(rollupCfg, l2Timestamp, unsafeTxValue.Data()) if errSafe != nil || errUnsafe != nil { l.Error("failed to umarshal l1 info", "errSafe", errSafe, "errUnsafe", errUnsafe) return diff --git a/op-node/rollup/derive/engine_consolidate_test.go b/op-node/rollup/attributes/engine_consolidate_test.go similarity index 92% rename from op-node/rollup/derive/engine_consolidate_test.go rename to op-node/rollup/attributes/engine_consolidate_test.go index 57b7970d3204..015e34f5d3d4 100644 --- a/op-node/rollup/derive/engine_consolidate_test.go +++ b/op-node/rollup/attributes/engine_consolidate_test.go @@ -1,19 +1,21 @@ -package derive +package attributes import ( - "math/rand" + "math/rand" // nosemgrep "testing" - "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" - "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/predeploys" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" ) var ( @@ -23,6 +25,7 @@ var ( validPrevRandao = eth.Bytes32(common.HexToHash("0x789")) validGasLimit = eth.Uint64Quantity(1000) validWithdrawals = types.Withdrawals{} + validFeeRecipient = predeploys.SequencerFeeVaultAddr ) type args struct { @@ -36,11 +39,12 @@ func ecotoneArgs() args { envelope: ð.ExecutionPayloadEnvelope{ ParentBeaconBlockRoot: &validParentBeaconRoot, ExecutionPayload: ð.ExecutionPayload{ - ParentHash: validParentHash, - Timestamp: validTimestamp, - PrevRandao: validPrevRandao, - GasLimit: validGasLimit, - Withdrawals: &validWithdrawals, + ParentHash: validParentHash, + Timestamp: validTimestamp, + PrevRandao: validPrevRandao, + GasLimit: validGasLimit, + Withdrawals: &validWithdrawals, + FeeRecipient: validFeeRecipient, }, }, attrs: ð.PayloadAttributes{ @@ -49,6 +53,7 @@ func ecotoneArgs() args { GasLimit: &validGasLimit, ParentBeaconBlockRoot: &validParentBeaconRoot, Withdrawals: &validWithdrawals, + SuggestedFeeRecipient: validFeeRecipient, }, parentHash: validParentHash, } @@ -133,6 +138,12 @@ func createMismatchedTimestamp() args { return args } +func createMismatchedFeeRecipient() args { + args := ecotoneArgs() + args.attrs.SuggestedFeeRecipient = common.Address{0xde, 0xad} + return args +} + func TestAttributesMatch(t *testing.T) { rollupCfg := &rollup.Config{} @@ -192,14 +203,18 @@ func TestAttributesMatch(t *testing.T) { shouldMatch: false, args: createMismatchedTimestamp(), }, + { + shouldMatch: false, + args: createMismatchedFeeRecipient(), + }, } - for _, test := range tests { + for i, test := range tests { err := AttributesMatchBlock(rollupCfg, test.args.attrs, test.args.parentHash, test.args.envelope, testlog.Logger(t, log.LevelInfo)) if test.shouldMatch { - require.NoError(t, err) + require.NoError(t, err, "fail %d", i) } else { - require.Error(t, err) + require.Error(t, err, "fail %d", i) } } } diff --git a/op-node/rollup/clsync/clsync.go b/op-node/rollup/clsync/clsync.go index 722af6c21133..fbd7b8a91d8d 100644 --- a/op-node/rollup/clsync/clsync.go +++ b/op-node/rollup/clsync/clsync.go @@ -119,12 +119,5 @@ func (eq *CLSync) Proceed(ctx context.Context) error { } eq.unsafePayloads.Pop() eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) - eq.log.Info("Sync progress", - "reason", "unsafe payload from sequencer", - "l2_finalized", eq.ec.Finalized(), - "l2_safe", eq.ec.SafeL2Head(), - "l2_unsafe", eq.ec.UnsafeL2Head(), - "l2_time", eq.ec.UnsafeL2Head().Time, - ) return nil } diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index e923695a6cdf..c6115cdae2f0 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -27,6 +27,12 @@ type AttributesBuilder interface { PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) } +type AttributesWithParent struct { + Attributes *eth.PayloadAttributes + Parent eth.L2BlockRef + IsLastInSpan bool +} + type AttributesQueue struct { log log.Logger config *rollup.Config diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index e7de21391256..6de097e42d01 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -161,6 +161,50 @@ func (e *EngineController) SetBackupUnsafeL2Head(r eth.L2BlockRef, triggerReorg e.needFCUCallForBackupUnsafeReorg = triggerReorg } +// logSyncProgressMaybe helps log forkchoice state-changes when applicable. +// First, the pre-state is registered. +// A callback is returned to then log the changes to the pre-state, if any. +func (e *EngineController) logSyncProgressMaybe() func() { + prevFinalized := e.finalizedHead + prevSafe := e.safeHead + prevPendingSafe := e.pendingSafeHead + prevUnsafe := e.unsafeHead + prevBackupUnsafe := e.backupUnsafeHead + return func() { + // if forkchoice still needs to be updated, then the last change was unsuccessful, thus no progress to log. + if e.needFCUCall || e.needFCUCallForBackupUnsafeReorg { + return + } + var reason string + if prevFinalized != e.finalizedHead { + reason = "finalized block" + } else if prevSafe != e.safeHead { + if prevSafe == prevUnsafe { + reason = "derived safe block from L1" + } else { + reason = "consolidated block with L1" + } + } else if prevUnsafe != e.unsafeHead { + reason = "new chain head block" + } else if prevPendingSafe != e.pendingSafeHead { + reason = "pending new safe block" + } else if prevBackupUnsafe != e.backupUnsafeHead { + reason = "new backup unsafe block" + } + if reason != "" { + e.log.Info("Sync progress", + "reason", reason, + "l2_finalized", e.finalizedHead, + "l2_safe", e.safeHead, + "l2_pending_safe", e.pendingSafeHead, + "l2_unsafe", e.unsafeHead, + "l2_backup_unsafe", e.backupUnsafeHead, + "l2_time", e.UnsafeL2Head().Time, + ) + } + } +} + // Engine Methods func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { @@ -177,12 +221,12 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR FinalizedBlockHash: e.finalizedHead.Hash, } - id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.attributes) + id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.Attributes) if err != nil { return errTyp, err } - e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.attributes.Timestamp)} + e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)} e.buildingSafe = updateSafe e.buildingOnto = parent if updateSafe { @@ -211,7 +255,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy FinalizedBlockHash: e.finalizedHead.Hash, } // Update the safe head if the payload is built with the last attributes in the batch. - updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan + updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor) if err != nil { return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err) @@ -308,6 +352,8 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { SafeBlockHash: e.safeHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash, } + logFn := e.logSyncProgressMaybe() + defer logFn() _, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError @@ -366,6 +412,8 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et e.SetSafeHead(ref) e.SetFinalizedHead(ref) } + logFn := e.logSyncProgressMaybe() + defer logFn() fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError @@ -433,6 +481,8 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro SafeBlockHash: e.safeHead.Hash, FinalizedBlockHash: e.finalizedHead.Hash, } + logFn := e.logSyncProgressMaybe() + defer logFn() fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 5cc924f94e67..a703c39c1696 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -5,11 +5,8 @@ import ( "errors" "fmt" "io" - "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-node/rollup" @@ -19,20 +16,6 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" ) -type AttributesWithParent struct { - attributes *eth.PayloadAttributes - parent eth.L2BlockRef - isLastInSpan bool -} - -func NewAttributesWithParent(attributes *eth.PayloadAttributes, parent eth.L2BlockRef, isLastInSpan bool) *AttributesWithParent { - return &AttributesWithParent{attributes, parent, isLastInSpan} -} - -func (a *AttributesWithParent) Attributes() *eth.PayloadAttributes { - return a.attributes -} - type NextAttributesProvider interface { Origin() eth.L1BlockRef NextAttributes(context.Context, eth.L2BlockRef) (*AttributesWithParent, error) @@ -123,6 +106,17 @@ type FinalizerHooks interface { Reset() } +type AttributesHandler interface { + // HasAttributes returns if there are any block attributes to process. + // HasAttributes is for EngineQueue testing only, and can be removed when attribute processing is fully independent. + HasAttributes() bool + // SetAttributes overwrites the set of attributes. This may be nil, to clear what may be processed next. + SetAttributes(attributes *AttributesWithParent) + // Proceed runs one attempt of processing attributes, if any. + // Proceed returns io.EOF if there are no attributes to process. + Proceed(ctx context.Context) error +} + // EngineQueue queues up payload attributes to consolidate or process with the provided Engine type EngineQueue struct { log log.Logger @@ -130,8 +124,7 @@ type EngineQueue struct { ec LocalEngineControl - // The queued-up attributes - safeAttributes *AttributesWithParent + attributesHandler AttributesHandler engine L2Source prev NextAttributesProvider @@ -153,18 +146,19 @@ type EngineQueue struct { // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener, - finalizer FinalizerHooks) *EngineQueue { + finalizer FinalizerHooks, attributesHandler AttributesHandler) *EngineQueue { return &EngineQueue{ - log: log, - cfg: cfg, - ec: engine, - engine: l2Source, - metrics: metrics, - prev: prev, - l1Fetcher: l1Fetcher, - syncCfg: syncCfg, - safeHeadNotifs: safeHeadNotifs, - finalizer: finalizer, + log: log, + cfg: cfg, + ec: engine, + engine: l2Source, + metrics: metrics, + prev: prev, + l1Fetcher: l1Fetcher, + syncCfg: syncCfg, + safeHeadNotifs: safeHeadNotifs, + finalizer: finalizer, + attributesHandler: attributesHandler, } } @@ -202,8 +196,24 @@ func (eq *EngineQueue) Step(ctx context.Context) error { // The pipeline cannot move forwards if doing EL sync. return EngineELSyncing } - if eq.safeAttributes != nil { - return eq.tryNextSafeAttributes(ctx) + if err := eq.attributesHandler.Proceed(ctx); err != io.EOF { + return err // if nil, or not EOF, then the attribute processing has to be revisited later. + } + if eq.lastNotifiedSafeHead != eq.ec.SafeL2Head() { + eq.lastNotifiedSafeHead = eq.ec.SafeL2Head() + // make sure we track the last L2 safe head for every new L1 block + if err := eq.safeHeadNotifs.SafeHeadUpdated(eq.lastNotifiedSafeHead, eq.origin.ID()); err != nil { + // At this point our state is in a potentially inconsistent state as we've updated the safe head + // in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back + // a little (it always rolls back at least 1 block) and then it will retry storing the entry + return NewResetError(fmt.Errorf("safe head notifications failed: %w", err)) + } + } + eq.finalizer.PostProcessSafeL2(eq.ec.SafeL2Head(), eq.origin) + + // try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind) + if err := eq.finalizer.OnDerivationL1End(ctx, eq.origin); err != nil { + return fmt.Errorf("finalizer OnDerivationL1End error: %w", err) } newOrigin := eq.prev.Origin() @@ -212,20 +222,13 @@ func (eq *EngineQueue) Step(ctx context.Context) error { return err } eq.origin = newOrigin - // make sure we track the last L2 safe head for every new L1 block - if err := eq.postProcessSafeL2(); err != nil { - return err - } - // try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind) - if err := eq.finalizer.OnDerivationL1End(ctx, eq.origin); err != nil { - return fmt.Errorf("finalizer OnDerivationL1End error: %w", err) - } + if next, err := eq.prev.NextAttributes(ctx, eq.ec.PendingSafeL2Head()); err == io.EOF { return io.EOF } else if err != nil { return err } else { - eq.safeAttributes = next + eq.attributesHandler.SetAttributes(next) eq.log.Debug("Adding next safe attributes", "safe_head", eq.ec.SafeL2Head(), "pending_safe_head", eq.ec.PendingSafeL2Head(), "next", next) return NotEnoughData @@ -264,194 +267,6 @@ func (eq *EngineQueue) verifyNewL1Origin(ctx context.Context, newOrigin eth.L1Bl return nil } -// postProcessSafeL2 buffers the L1 block the safe head was fully derived from, -// to finalize it once the L1 block, or later, finalizes. -func (eq *EngineQueue) postProcessSafeL2() error { - if err := eq.notifyNewSafeHead(eq.ec.SafeL2Head()); err != nil { - return err - } - eq.finalizer.PostProcessSafeL2(eq.ec.SafeL2Head(), eq.origin) - return nil -} - -// notifyNewSafeHead calls the safe head listener with the current safe head and l1 origin information. -func (eq *EngineQueue) notifyNewSafeHead(safeHead eth.L2BlockRef) error { - if eq.lastNotifiedSafeHead == safeHead { - // No change, no need to notify - return nil - } - if err := eq.safeHeadNotifs.SafeHeadUpdated(safeHead, eq.origin.ID()); err != nil { - // At this point our state is in a potentially inconsistent state as we've updated the safe head - // in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back - // a little (it always rolls back at least 1 block) and then it will retry storing the entry - return NewResetError(fmt.Errorf("safe head notifications failed: %w", err)) - } - eq.lastNotifiedSafeHead = safeHead - return nil -} - -func (eq *EngineQueue) logSyncProgress(reason string) { - eq.log.Info("Sync progress", - "reason", reason, - "l2_finalized", eq.ec.Finalized(), - "l2_safe", eq.ec.SafeL2Head(), - "l2_pending_safe", eq.ec.PendingSafeL2Head(), - "l2_unsafe", eq.ec.UnsafeL2Head(), - "l2_backup_unsafe", eq.ec.BackupUnsafeL2Head(), - "l2_time", eq.ec.UnsafeL2Head().Time, - "l1_derived", eq.origin, - ) -} - -func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { - if eq.safeAttributes == nil { // sanity check the attributes are there - return nil - } - // validate the safe attributes before processing them. The engine may have completed processing them through other means. - if eq.ec.PendingSafeL2Head() != eq.safeAttributes.parent { - // Previously the attribute's parent was the pending safe head. If the pending safe head advances so pending safe head's parent is the same as the - // attribute's parent then we need to cancel the attributes. - if eq.ec.PendingSafeL2Head().ParentHash == eq.safeAttributes.parent.Hash { - eq.log.Warn("queued safe attributes are stale, safehead progressed", - "pending_safe_head", eq.ec.PendingSafeL2Head(), "pending_safe_head_parent", eq.ec.PendingSafeL2Head().ParentID(), - "attributes_parent", eq.safeAttributes.parent) - eq.safeAttributes = nil - return nil - } - // If something other than a simple advance occurred, perform a full reset - return NewResetError(fmt.Errorf("pending safe head changed to %s with parent %s, conflicting with queued safe attributes on top of %s", - eq.ec.PendingSafeL2Head(), eq.ec.PendingSafeL2Head().ParentID(), eq.safeAttributes.parent)) - - } - if eq.ec.PendingSafeL2Head().Number < eq.ec.UnsafeL2Head().Number { - return eq.consolidateNextSafeAttributes(ctx) - } else if eq.ec.PendingSafeL2Head().Number == eq.ec.UnsafeL2Head().Number { - return eq.forceNextSafeAttributes(ctx) - } else { - // For some reason the unsafe head is behind the pending safe head. Log it, and correct it. - eq.log.Error("invalid sync state, unsafe head is behind pending safe head", "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head()) - eq.ec.SetUnsafeHead(eq.ec.PendingSafeL2Head()) - return nil - } -} - -// consolidateNextSafeAttributes tries to match the next safe attributes against the existing unsafe chain, -// to avoid extra processing or unnecessary unwinding of the chain. -// However, if the attributes do not match, they will be forced with forceNextSafeAttributes. -func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - envelope, err := eq.engine.PayloadByNumber(ctx, eq.ec.PendingSafeL2Head().Number+1) - if err != nil { - if errors.Is(err, ethereum.NotFound) { - // engine may have restarted, or inconsistent safe head. We need to reset - return NewResetError(fmt.Errorf("expected engine was synced and had unsafe block to reconcile, but cannot find the block: %w", err)) - } - return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %w", err)) - } - if err := AttributesMatchBlock(eq.cfg, eq.safeAttributes.attributes, eq.ec.PendingSafeL2Head().Hash, envelope, eq.log); err != nil { - eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err, "unsafe", eq.ec.UnsafeL2Head(), "pending_safe", eq.ec.PendingSafeL2Head(), "safe", eq.ec.SafeL2Head()) - // geth cannot wind back a chain without reorging to a new, previously non-canonical, block - return eq.forceNextSafeAttributes(ctx) - } - ref, err := PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload) - if err != nil { - return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err)) - } - eq.ec.SetPendingSafeL2Head(ref) - if eq.safeAttributes.isLastInSpan { - eq.ec.SetSafeHead(ref) - if err := eq.postProcessSafeL2(); err != nil { - return err - } - } - // unsafe head stays the same, we did not reorg the chain. - eq.safeAttributes = nil - eq.logSyncProgress("reconciled with L1") - - return nil -} - -// forceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain. -func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { - if eq.safeAttributes == nil { - return nil - } - attrs := eq.safeAttributes.attributes - lastInSpan := eq.safeAttributes.isLastInSpan - errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true) - if err == nil { - _, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{}) - } - if err != nil { - switch errType { - case BlockInsertTemporaryErr: - // RPC errors are recoverable, we can retry the buffered payload attributes later. - return NewTemporaryError(fmt.Errorf("temporarily cannot insert new safe block: %w", err)) - case BlockInsertPrestateErr: - _ = eq.CancelPayload(ctx, true) - return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err)) - case BlockInsertPayloadErr: - _ = eq.CancelPayload(ctx, true) - eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err) - // Count the number of deposits to see if the tx list is deposit only. - depositCount := 0 - for _, tx := range attrs.Transactions { - if len(tx) > 0 && tx[0] == types.DepositTxType { - depositCount += 1 - } - } - // Deposit transaction execution errors are suppressed in the execution engine, but if the - // block is somehow invalid, there is nothing we can do to recover & we should exit. - // TODO: Can this be triggered by an empty batch with invalid data (like parent hash or gas limit?) - if len(attrs.Transactions) == depositCount { - eq.log.Error("deposit only block was invalid", "parent", eq.safeAttributes.parent, "err", err) - return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err)) - } - // drop the payload without inserting it - eq.safeAttributes = nil - // Revert the pending safe head to the safe head. - eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head()) - // suppress the error b/c we want to retry with the next batch from the batch queue - // If there is no valid batch the node will eventually force a deposit only block. If - // the deposit only block fails, this will return the critical error above. - - // Try to restore to previous known unsafe chain. - eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true) - - return nil - default: - return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)) - } - } - eq.safeAttributes = nil - eq.logSyncProgress("processed safe block derived from L1") - if lastInSpan { - if err := eq.postProcessSafeL2(); err != nil { - return err - } - } - - return nil -} - -func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) { - return eq.ec.StartPayload(ctx, parent, attrs, updateSafe) -} - -func (eq *EngineQueue) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { - return eq.ec.ConfirmPayload(ctx, agossip, sequencerConductor) -} - -func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { - return eq.ec.CancelPayload(ctx, force) -} - -func (eq *EngineQueue) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { - return eq.ec.BuildingPayload() -} - // Reset walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { @@ -499,7 +314,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System eq.ec.SetPendingSafeL2Head(safe) eq.ec.SetFinalizedHead(finalized) eq.ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) - eq.safeAttributes = nil + eq.attributesHandler.SetAttributes(nil) eq.ec.ResetBuildingState() eq.finalizer.Reset() // note: finalizedL1 and triedFinalizeAt do not reset, since these do not change between reorgs. @@ -524,6 +339,5 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System return err } } - eq.logSyncProgress("reset derivation work") return io.EOF } diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 7de117ed67d5..306e39a5281a 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -4,12 +4,10 @@ import ( "context" "fmt" "io" - "math/big" "math/rand" "testing" "github.com/ethereum-optimism/optimism/op-node/node/safedb" - "github.com/holiman/uint256" "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/common" @@ -17,8 +15,6 @@ import ( "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-node/rollup/async" - "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" @@ -59,6 +55,29 @@ func (n noopFinality) Reset() { var _ FinalizerHooks = (*noopFinality)(nil) +type fakeAttributesHandler struct { + attributes *AttributesWithParent + err error +} + +func (f *fakeAttributesHandler) HasAttributes() bool { + return f.attributes != nil +} + +func (f *fakeAttributesHandler) SetAttributes(attributes *AttributesWithParent) { + f.attributes = attributes +} + +func (f *fakeAttributesHandler) Proceed(ctx context.Context) error { + if f.err != nil { + return f.err + } + f.attributes = nil + return io.EOF +} + +var _ AttributesHandler = (*fakeAttributesHandler)(nil) + func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) @@ -267,7 +286,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { prev := &fakeAttributesQueue{origin: refE} ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) - eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") @@ -597,7 +616,7 @@ func TestVerifyNewL1Origin(t *testing.T) { prev := &fakeAttributesQueue{origin: refE} ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) - eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") @@ -694,7 +713,8 @@ func TestBlockBuildingRace(t *testing.T) { prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync) - eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) + attribHandler := &fakeAttributesHandler{} + eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, attribHandler) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) id := eth.PayloadID{0xff} @@ -717,79 +737,21 @@ func TestBlockBuildingRace(t *testing.T) { eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil) require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset") - // Expect initial building update, to process the attributes we queued up - eng.ExpectForkchoiceUpdate(preFc, attrs, preFcRes, nil) + // Expect initial building update, to process the attributes we queued up. Attributes get in + require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "queue up attributes") + require.True(t, eq.attributesHandler.HasAttributes()) + // Don't let the payload be confirmed straight away - mockErr := fmt.Errorf("mock error") - eng.ExpectGetPayload(id, nil, mockErr) // The job will be not be cancelled, the untyped error is a temporary error - - require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "queue up attributes") + mockErr := fmt.Errorf("mock error") + attribHandler.err = mockErr require.ErrorIs(t, eq.Step(context.Background()), mockErr, "expecting to fail to process attributes") - require.NotNil(t, eq.safeAttributes, "still have attributes") + require.True(t, eq.attributesHandler.HasAttributes(), "still have attributes") // Now allow the building to complete - a1InfoTx, err := L1InfoDepositBytes(cfg, cfg.Genesis.SystemConfig, refA1.SequenceNumber, &testutils.MockBlockInfo{ - InfoHash: refA.Hash, - InfoParentHash: refA.ParentHash, - InfoCoinbase: common.Address{}, - InfoRoot: common.Hash{}, - InfoNum: refA.Number, - InfoTime: refA.Time, - InfoMixDigest: [32]byte{}, - InfoBaseFee: big.NewInt(7), - InfoReceiptRoot: common.Hash{}, - InfoGasUsed: 0, - }, 0) - - require.NoError(t, err) - payloadA1 := ð.ExecutionPayload{ - ParentHash: refA1.ParentHash, - FeeRecipient: attrs.SuggestedFeeRecipient, - StateRoot: eth.Bytes32{}, - ReceiptsRoot: eth.Bytes32{}, - LogsBloom: eth.Bytes256{}, - PrevRandao: eth.Bytes32{}, - BlockNumber: eth.Uint64Quantity(refA1.Number), - GasLimit: gasLimit, - GasUsed: 0, - Timestamp: eth.Uint64Quantity(refA1.Time), - ExtraData: nil, - BaseFeePerGas: eth.Uint256Quantity(*uint256.NewInt(7)), - BlockHash: refA1.Hash, - Transactions: []eth.Data{ - a1InfoTx, - }, - } - envelope := ð.ExecutionPayloadEnvelope{ExecutionPayload: payloadA1} - eng.ExpectGetPayload(id, envelope, nil) - eng.ExpectNewPayload(payloadA1, nil, ð.PayloadStatusV1{ - Status: eth.ExecutionValid, - LatestValidHash: &refA1.Hash, - ValidationError: nil, - }, nil) - postFc := ð.ForkchoiceState{ - HeadBlockHash: refA1.Hash, - SafeBlockHash: refA1.Hash, - FinalizedBlockHash: refA0.Hash, - } - postFcRes := ð.ForkchoiceUpdatedResult{ - PayloadStatus: eth.PayloadStatusV1{ - Status: eth.ExecutionValid, - LatestValidHash: &refA1.Hash, - ValidationError: nil, - }, - PayloadID: &id, - } - eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil) - - // Now complete the job, as external user of the engine - _, _, err = eq.ConfirmPayload(context.Background(), async.NoOpGossiper{}, &conductor.NoOpConductor{}) - require.NoError(t, err) - require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed") + attribHandler.err = nil - require.NoError(t, eq.Step(context.Background())) - require.Nil(t, eq.safeAttributes, "attributes should now be invalidated") + require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "next attributes") l1F.AssertExpectations(t) eng.AssertExpectations(t) @@ -866,7 +828,7 @@ func TestResetLoop(t *testing.T) { prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true} ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync) - eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}) + eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled, noopFinality{}, &fakeAttributesHandler{}) eq.ec.SetUnsafeHead(refA2) eq.ec.SetSafeHead(refA1) eq.ec.SetFinalizedHead(refA0) @@ -879,10 +841,10 @@ func TestResetLoop(t *testing.T) { FinalizedBlockHash: refA0.Hash, } eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil) - require.Nil(t, eq.safeAttributes) + require.False(t, eq.attributesHandler.HasAttributes()) require.ErrorIs(t, eq.Step(context.Background()), nil) require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData) - require.NotNil(t, eq.safeAttributes) + require.True(t, eq.attributesHandler.HasAttributes()) // Perform the reset require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 3a4390e6e297..76e2ba732e8e 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -65,7 +65,7 @@ type DerivationPipeline struct { func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, - syncCfg *sync.Config, safeHeadListener SafeHeadListener, finalizer FinalizerHooks) *DerivationPipeline { + syncCfg *sync.Config, safeHeadListener SafeHeadListener, finalizer FinalizerHooks, attributesHandler AttributesHandler) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) @@ -79,7 +79,8 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue) // Step stages - eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener, finalizer) + eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, + l1Fetcher, syncCfg, safeHeadListener, finalizer, attributesHandler) // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // the reset, but after the engine queue, this is the order in which the stages could talk to each other. diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index dfc2fc070fd1..138ff76b3113 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/async" + "github.com/ethereum-optimism/optimism/op-node/rollup/attributes" "github.com/ethereum-optimism/optimism/op-node/rollup/clsync" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -166,8 +167,9 @@ func NewDriver( finalizer = finality.NewFinalizer(log, cfg, l1, engine) } + attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, l2) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, - metrics, syncCfg, safeHeadListener, finalizer) + metrics, syncCfg, safeHeadListener, finalizer, attributesHandler) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index b46db64b1a0e..80a2bb7df040 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -112,7 +112,7 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { "origin", l1Origin, "origin_time", l1Origin.Time, "noTxPool", attrs.NoTxPool) // Start a payload building process. - withParent := derive.NewAttributesWithParent(attrs, l2Head, false) + withParent := &derive.AttributesWithParent{Attributes: attrs, Parent: l2Head, IsLastInSpan: false} errTyp, err := d.engine.StartPayload(ctx, l2Head, withParent, false) if err != nil { return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err) diff --git a/op-node/rollup/driver/sequencer_test.go b/op-node/rollup/driver/sequencer_test.go index 6edf0960f08a..31bcc6b5595b 100644 --- a/op-node/rollup/driver/sequencer_test.go +++ b/op-node/rollup/driver/sequencer_test.go @@ -70,7 +70,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block _, _ = crand.Read(m.buildingID[:]) m.buildingOnto = parent m.buildingSafe = updateSafe - m.buildingAttrs = attrs.Attributes() + m.buildingAttrs = attrs.Attributes m.buildingStart = m.timeNow() return derive.BlockInsertOK, nil } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 7caff92eb253..0d61c25e82da 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -194,18 +194,6 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionP } } -func (s *Driver) logSyncProgress(reason string) { - s.log.Info("Sync progress", - "reason", reason, - "l2_finalized", s.engineController.Finalized(), - "l2_safe", s.engineController.SafeL2Head(), - "l2_pending_safe", s.engineController.PendingSafeL2Head(), - "l2_unsafe", s.engineController.UnsafeL2Head(), - "l2_time", s.engineController.UnsafeL2Head().Time, - "l1_derived", s.derivation.Origin(), - ) -} - // the eventLoop responds to L1 changes and internal timers to produce L2 blocks. func (s *Driver) eventLoop() { defer s.wg.Done() @@ -352,7 +340,6 @@ func (s *Driver) eventLoop() { if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil { s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err) } - s.logSyncProgress("unsafe payload from sequencer while in EL sync") } case newL1Head := <-s.l1HeadSig: s.l1State.HandleNewL1HeadBlock(newL1Head) diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index 70525d632f27..10b3a3ca5f27 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -6,14 +6,16 @@ import ( "fmt" "io" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/attributes" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum/log" ) var ErrClaimNotValid = errors.New("invalid claim") @@ -53,7 +55,8 @@ type Driver struct { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) - pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled, NoopFinalizer{}) + attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source) + pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled, NoopFinalizer{}, attributesHandler) pipeline.Reset() return &Driver{ logger: logger, diff --git a/op-service/testutils/mock_engine.go b/op-service/testutils/mock_engine.go index 99dadf534ec4..6fc6a078acdc 100644 --- a/op-service/testutils/mock_engine.go +++ b/op-service/testutils/mock_engine.go @@ -2,6 +2,7 @@ package testutils import ( "context" + "encoding/json" "github.com/ethereum/go-ethereum/common" @@ -22,19 +23,27 @@ func (m *MockEngine) ExpectGetPayload(payloadId eth.PayloadID, payload *eth.Exec } func (m *MockEngine) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) { - out := m.Mock.Called(state, attr) + out := m.Mock.Called(mustJson(state), mustJson(attr)) return out.Get(0).(*eth.ForkchoiceUpdatedResult), out.Error(1) } func (m *MockEngine) ExpectForkchoiceUpdate(state *eth.ForkchoiceState, attr *eth.PayloadAttributes, result *eth.ForkchoiceUpdatedResult, err error) { - m.Mock.On("ForkchoiceUpdate", state, attr).Once().Return(result, err) + m.Mock.On("ForkchoiceUpdate", mustJson(state), mustJson(attr)).Once().Return(result, err) } func (m *MockEngine) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) { - out := m.Mock.Called(payload, parentBeaconBlockRoot) + out := m.Mock.Called(mustJson(payload), mustJson(parentBeaconBlockRoot)) return out.Get(0).(*eth.PayloadStatusV1), out.Error(1) } func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash, result *eth.PayloadStatusV1, err error) { - m.Mock.On("NewPayload", payload, parentBeaconBlockRoot).Once().Return(result, err) + m.Mock.On("NewPayload", mustJson(payload), mustJson(parentBeaconBlockRoot)).Once().Return(result, err) +} + +func mustJson[E any](elem E) string { + data, err := json.MarshalIndent(elem, " ", " ") + if err != nil { + panic(err) + } + return string(data) }