Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-program: refactor driver to use events derivers #10971

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions op-node/rollup/engine/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
if err != nil {
if !errors.Is(err, derive.ErrTemporary) {
e.emitter.Emit(InvalidPayloadEvent{})
}
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
}
ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
Expand Down
133 changes: 133 additions & 0 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
Expand All @@ -20,6 +24,14 @@ func (ev InvalidPayloadEvent) String() string {
return "invalid-payload"
}

type InvalidPayloadAttributesEvent struct {
Attributes *derive.AttributesWithParent
}

func (ev InvalidPayloadAttributesEvent) String() string {
return "invalid-payload-attributes"
}

// ForkchoiceRequestEvent signals to the engine that it should emit an artificial
// forkchoice-update event, to signal the latest forkchoice to other derivers.
// This helps decouple derivers from the actual engine state,
Expand All @@ -39,6 +51,40 @@ func (ev ForkchoiceUpdateEvent) String() string {
return "forkchoice-update"
}

type PendingSafeUpdateEvent struct {
PendingSafe eth.L2BlockRef
Unsafe eth.L2BlockRef // tip, added to the signal, to determine if there are existing blocks to consolidate
}

func (ev PendingSafeUpdateEvent) String() string {
return "pending-safe-update"
}

// PromotePendingSafeEvent signals that a block can be marked as pending-safe, and/or safe.
type PromotePendingSafeEvent struct {
Ref eth.L2BlockRef
Safe bool
}

func (ev PromotePendingSafeEvent) String() string {
return "promote-pending-safe"
}

type ProcessAttributesEvent struct {
Attributes *derive.AttributesWithParent
}

func (ev ProcessAttributesEvent) String() string {
return "process-attributes"
}

type PendingSafeRequestEvent struct {
}

func (ev PendingSafeRequestEvent) String() string {
return "pending-safe-request"
}

type ProcessUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
Expand Down Expand Up @@ -172,7 +218,94 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent{})
case ProcessAttributesEvent:
d.onForceNextSafeAttributes(x.Attributes)
case PendingSafeRequestEvent:
d.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: d.ec.PendingSafeL2Head(),
Unsafe: d.ec.UnsafeL2Head(),
})
case PromotePendingSafeEvent:
// Only promote if not already stale.
// Resets/overwrites happen through engine-resets, not through promotion.
if x.Ref.Number > d.ec.PendingSafeL2Head().Number {
d.ec.SetPendingSafeL2Head(x.Ref)
}
if x.Safe && x.Ref.Number > d.ec.SafeL2Head().Number {
d.ec.SetSafeHead(x.Ref)
}
}
}

// onForceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngDeriver) onForceNextSafeAttributes(attributes *derive.AttributesWithParent) {
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
defer cancel()

attrs := attributes.Attributes
errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true)
var envelope *eth.ExecutionPayloadEnvelope
if err == nil {
envelope, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err)})
return
case BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err)})
return
case BlockInsertPayloadErr:
if !errors.Is(err, derive.ErrTemporary) {
eq.emitter.Emit(InvalidPayloadAttributesEvent{Attributes: attributes})
}
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping attributes", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err)
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("failed to process block with only deposit transactions: %w", err)})
return
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.

// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)

// drop the payload without inserting it into the engine
return
default:
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)})
}
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to decode L2 block ref from payload: %w", err)})
return
}
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
}
eq.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: eq.ec.PendingSafeL2Head(),
Unsafe: eq.ec.UnsafeL2Head(),
})
}

type ResetEngineControl interface {
Expand Down
179 changes: 53 additions & 126 deletions op-program/client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,161 +3,88 @@ package driver
import (
"context"
"errors"
"fmt"
"io"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

type Derivation interface {
Step(ctx context.Context) error
type EndCondition interface {
Closing() bool
Result() error
}

type Pipeline interface {
Step(ctx context.Context, pendingSafeHead eth.L2BlockRef) (outAttrib *derive.AttributesWithParent, outErr error)
ConfirmEngineReset()
}
type Driver struct {
logger log.Logger

type Engine interface {
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
TryUpdateEngine(ctx context.Context) error
engine.ResetEngineControl
}
events []rollup.Event

type L2Source interface {
engine.Engine
L2OutputRoot(uint64) (eth.Bytes32, error)
end EndCondition
deriver rollup.Deriver
}

type Deriver interface {
SafeL2Head() eth.L2BlockRef
SyncStep(ctx context.Context) error
}
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
l1BlobsSource derive.L1BlobsFetcher, l2Source engine.Engine, targetBlockNum uint64) *Driver {

type MinimalSyncDeriver struct {
logger log.Logger
pipeline Pipeline
attributesHandler driver.AttributesHandler
l1Source derive.L1Fetcher
l2Source L2Source
engine Engine
syncCfg *sync.Config
initialResetDone bool
cfg *rollup.Config
}
d := &Driver{
logger: logger,
}

func (d *MinimalSyncDeriver) SafeL2Head() eth.L2BlockRef {
return d.engine.SafeL2Head()
}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline, d)

func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error {
if !d.initialResetDone {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
// The below two calls emulate ResetEngine, without event-processing.
// This will be omitted after op-program adopts events, and the deriver code is used instead.
result, err := sync.FindL2Heads(ctx, d.cfg, d.l1Source, d.l2Source, d.logger, d.syncCfg)
if err != nil {
// not really a temporary error in this context, but preserves old ResetEngine behavior.
return derive.NewTemporaryError(fmt.Errorf("failed to determine starting point: %w", err))
}
engine.ForceEngineReset(d.engine, engine.ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
d.pipeline.ConfirmEngineReset()
d.initialResetDone = true
}
ec := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, d)
engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, ec, d)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg, d)

if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
if err := d.attributesHandler.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next attributes. Then we should derive the next attributes.
return err
prog := &ProgramDeriver{
logger: logger,
Emitter: d,
closing: false,
result: nil,
targetBlockNum: targetBlockNum,
}

attrib, err := d.pipeline.Step(ctx, d.engine.PendingSafeL2Head())
if err != nil {
return err
d.deriver = &rollup.SynchronousDerivers{
prog,
engineDeriv,
pipelineDeriver,
engResetDeriv,
}
d.attributesHandler.SetAttributes(attrib)
return nil
}

type Driver struct {
logger log.Logger
d.end = prog

deriver Deriver

l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64
return d
}

func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
return &Driver{
logger: logger,
deriver: &MinimalSyncDeriver{
logger: logger,
pipeline: pipeline,
attributesHandler: attributesHandler,
l1Source: l1Source,
l2Source: l2Source,
engine: engine,
syncCfg: syncCfg,
cfg: cfg,
},
l2OutputRoot: l2Source.L2OutputRoot,
targetBlockNum: targetBlockNum,
func (d *Driver) Emit(ev rollup.Event) {
if d.end.Closing() {
return
}
d.events = append(d.events, ev)
}

// Step runs the next step of the derivation pipeline.
// Returns nil if there are further steps to be performed
// Returns io.EOF if the derivation completed successfully
// Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error {
if err := d.deriver.SyncStep(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.deriver.SafeL2Head())
return io.EOF
} else if errors.Is(err, derive.NotEnoughData) {
// NotEnoughData is not handled differently than a nil error.
// This used to be returned by the EngineQueue when a block was derived, but also other stages.
// Instead, every driver-loop iteration we check if the target block number has been reached.
d.logger.Debug("Data is lacking")
} else if errors.Is(err, derive.ErrTemporary) {
// While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled
d.logger.Warn("Temporary error in derivation", "err", err)
return nil
} else if err != nil {
return fmt.Errorf("pipeline err: %w", err)
}
head := d.deriver.SafeL2Head()
if head.Number >= d.targetBlockNum {
d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF
}
return nil
}
var ExhaustErr = errors.New("exhausted events before completing program")

func (d *Driver) SafeHead() eth.L2BlockRef {
return d.deriver.SafeL2Head()
func (d *Driver) RunComplete() error {
// Initial reset
d.Emit(engine.ResetEngineRequestEvent{})

for !d.end.Closing() {
if len(d.events) == 0 {
return ExhaustErr
}
if len(d.events) > 10000 { // sanity check, in case of bugs. Better than going OOM.
return errors.New("way too many events queued up, something is wrong")
}
ev := d.events[0]
d.events = d.events[1:]
d.deriver.OnEvent(ev)
}
return d.end.Result()
}
Loading