Skip to content

Commit

Permalink
op-node: extract unsafe-block processing from derivation code-path
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed May 30, 2024
1 parent 975def6 commit 113cde7
Show file tree
Hide file tree
Showing 11 changed files with 505 additions and 224 deletions.
26 changes: 23 additions & 3 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
Expand All @@ -36,6 +37,7 @@ type L2Verifier struct {
// L2 rollup
engine *derive.EngineController
derivation *derive.DerivationPipeline
clSync *clsync.CLSync

finalizer driver.Finalizer

Expand Down Expand Up @@ -70,6 +72,8 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)

clSync := clsync.NewCLSync(log, cfg, metrics, engine)

var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, engine, plasmaSrc)
Expand All @@ -84,6 +88,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
log: log,
eng: eng,
engine: engine,
clSync: clSync,
derivation: pipeline,
finalizer: finalizer,
l1: l1,
Expand Down Expand Up @@ -229,15 +234,30 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
s.finalizer.Finalize(t.Ctx(), finalized)
}

// syncStep represents the Driver.syncStep
func (s *L2Verifier) syncStep(ctx context.Context) error {
if fcuCalled, err := s.engine.TryBackupUnsafeReorg(ctx); fcuCalled {
return err
}
if err := s.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
if err := s.clSync.Proceed(ctx); err != io.EOF {
return err
}

s.l2PipelineIdle = false
return s.derivation.Step(ctx)
}

// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
func (s *L2Verifier) ActL2PipelineStep(t Testing) {
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}

s.l2PipelineIdle = false
err := s.derivation.Step(t.Ctx())
err := s.syncStep(t.Ctx())
if err == io.EOF || (err != nil && errors.Is(err, derive.EngineELSyncing)) {
s.l2PipelineIdle = true
return
Expand Down Expand Up @@ -272,7 +292,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.derivation.AddUnsafePayload(payload)
s.clSync.AddUnsafePayload(payload)
}
}

Expand Down
130 changes: 130 additions & 0 deletions op-node/rollup/clsync/clsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package clsync

import (
"context"
"errors"
"io"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024

type Metrics interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
}

type Engine interface {
derive.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}

// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}

func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}

// LowestQueuedUnsafeBlock retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
if payload == nil {
return eth.L2BlockRef{}
}
ref, err := derive.PayloadToBlockRef(eq.cfg, payload.ExecutionPayload)
if err != nil {
return eth.L2BlockRef{}
}
return ref
}

// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}

if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}

// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain.
// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing.
func (eq *CLSync) Proceed(ctx context.Context) error {
if eq.unsafePayloads.Len() == 0 {
return io.EOF
}
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload

if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
}

// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
}

ref, err := derive.PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
}

if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.log.Info("Sync progress",
"reason", "unsafe payload from sequencer",
"l2_finalized", eq.ec.Finalized(),
"l2_safe", eq.ec.SafeL2Head(),
"l2_unsafe", eq.ec.UnsafeL2Head(),
"l2_time", eq.ec.UnsafeL2Head().Time,
)
return nil
}
Loading

0 comments on commit 113cde7

Please sign in to comment.