Skip to content

Commit

Permalink
op-node: sequencer should clear inconsistent asyncGossip buffer on se…
Browse files Browse the repository at this point in the history
…quencer-start (#11724)

* op-node: sequencer should clear inconsistent asyncGossip buffer on sequencer-start

* op-node: test async-gossip clearing

* Update op-node/rollup/sequencing/sequencer.go

Co-authored-by: Adrian Sutton <[email protected]>

---------

Co-authored-by: Adrian Sutton <[email protected]>
  • Loading branch information
protolambda and ajsutton authored Sep 5, 2024
1 parent c2d0911 commit 224c5fd
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 0 deletions.
20 changes: 20 additions & 0 deletions op-node/rollup/sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ func (d *Sequencer) onSequencerAction(x SequencerActionEvent) {
d.asyncGossip.Clear() // bad payload
return
}
d.log.Info("Resuming sequencing with previously async-gossip confirmed payload",
"payload", payload.ExecutionPayload.ID())
// Payload is known, we must have resumed sequencer-actions after a temporary error,
// meaning that we have seen BuildSealedEvent already.
// We can retry processing to make it canonical.
Expand Down Expand Up @@ -616,9 +618,27 @@ func (d *Sequencer) Init(ctx context.Context, active bool) error {

// forceStart skips all the checks, and just starts the sequencer
func (d *Sequencer) forceStart() error {
if d.latestHead == (eth.L2BlockRef{}) {
// This happens if sequencing is activated on op-node startup.
// The op-conductor check and choice of sequencing with this pre-state already happened before op-node startup.
d.log.Info("Starting sequencing, without known pre-state")
d.asyncGossip.Clear() // if we are starting from an unknown pre-state, just clear gossip out of caution.
} else {
// This happens when we start sequencing on an already-running node.
d.log.Info("Starting sequencing on top of known pre-state", "head", d.latestHead)
if payload := d.asyncGossip.Get(); payload != nil &&
payload.ExecutionPayload.BlockHash != d.latestHead.Hash {
d.log.Warn("Cleared old block from async-gossip buffer, sequencing pre-state is different",
"buffered", payload.ExecutionPayload.ID(), "prestate", d.latestHead)
d.asyncGossip.Clear()
}
}

if err := d.listener.SequencerStarted(); err != nil {
return fmt.Errorf("failed to notify sequencer-state listener of start: %w", err)
}
// clear the building state; interrupting any existing sequencing job (there should never be one)
d.latest = BuildingState{}
d.nextActionOK = true
d.nextAction = d.timeNow()
d.active.Store(true)
Expand Down
196 changes: 196 additions & 0 deletions op-node/rollup/sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,202 @@ func TestSequencer_StartStop(t *testing.T) {
require.NoError(t, err)
}

// TestSequencer_StaleBuild stops the sequencer after block-building,
// but before processing the block locally,
// and then continues it again, to check if the async-gossip gets cleared,
// instead of trying to re-insert the block.
func TestSequencer_StaleBuild(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
seq, deps := createSequencer(logger)

testClock := clock.NewSimpleClock()
seq.timeNow = testClock.Now
testClock.SetTime(30000)

emitter := &testutils.MockEmitter{}
seq.AttachEmitter(emitter)
deps.conductor.leader = true

emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
require.NoError(t, seq.Init(context.Background(), false))
emitter.AssertExpectations(t)
require.False(t, deps.conductor.closed, "conductor is ready")
require.True(t, deps.asyncGossip.started, "async gossip is always started on initialization")
require.False(t, deps.seqState.active, "sequencer not active yet")

head := eth.L2BlockRef{
Hash: common.Hash{0x22},
Number: 100,
L1Origin: eth.BlockID{
Hash: common.Hash{0x11, 0xa},
Number: 1000,
},
Time: uint64(testClock.Now().Unix()),
}
seq.OnEvent(engine.ForkchoiceUpdateEvent{UnsafeL2Head: head})

require.NoError(t, seq.Start(context.Background(), head.Hash))
require.True(t, seq.Active())
require.True(t, deps.seqState.active, "sequencer signaled it is active")

// sequencer is active now, wants to build.
_, ok := seq.NextAction()
require.True(t, ok)

// pretend we progress to the next L1 origin, catching up with the L2 time
l1Origin := eth.L1BlockRef{
Hash: common.Hash{0x11, 0xb},
ParentHash: head.L1Origin.Hash,
Number: head.L1Origin.Number + 1,
Time: head.Time + 2,
}
deps.l1OriginSelector.l1OriginFn = func(l2Head eth.L2BlockRef) (eth.L1BlockRef, error) {
return l1Origin, nil
}
var sentAttributes *derive.AttributesWithParent
emitter.ExpectOnceRun(func(ev event.Event) {
x, ok := ev.(engine.BuildStartEvent)
require.True(t, ok)
require.Equal(t, head, x.Attributes.Parent)
require.Equal(t, head.Time+deps.cfg.BlockTime, uint64(x.Attributes.Attributes.Timestamp))
require.Equal(t, eth.L1BlockRef{}, x.Attributes.DerivedFrom)
sentAttributes = x.Attributes
})
seq.OnEvent(SequencerActionEvent{})
emitter.AssertExpectations(t)

// Now report the block was started
startedTime := time.Unix(int64(head.Time), 0).Add(time.Millisecond * 150)
testClock.Set(startedTime)
payloadInfo := eth.PayloadInfo{
ID: eth.PayloadID{0x42},
Timestamp: head.Time + deps.cfg.BlockTime,
}
seq.OnEvent(engine.BuildStartedEvent{
Info: payloadInfo,
BuildStarted: startedTime,
Parent: head,
IsLastInSpan: false,
DerivedFrom: eth.L1BlockRef{},
})

_, ok = seq.NextAction()
require.True(t, ok, "must be ready to seal the block now")

emitter.ExpectOnce(engine.BuildSealEvent{
Info: payloadInfo,
BuildStarted: startedTime,
IsLastInSpan: false,
DerivedFrom: eth.L1BlockRef{},
})
seq.OnEvent(SequencerActionEvent{})
emitter.AssertExpectations(t)

_, ok = seq.NextAction()
require.False(t, ok, "cannot act until sealing completes/fails")

payloadEnvelope := &eth.ExecutionPayloadEnvelope{
ParentBeaconBlockRoot: sentAttributes.Attributes.ParentBeaconBlockRoot,
ExecutionPayload: &eth.ExecutionPayload{
ParentHash: head.Hash,
FeeRecipient: sentAttributes.Attributes.SuggestedFeeRecipient,
BlockNumber: eth.Uint64Quantity(sentAttributes.Parent.Number + 1),
BlockHash: common.Hash{0x12, 0x34},
Timestamp: sentAttributes.Attributes.Timestamp,
Transactions: sentAttributes.Attributes.Transactions,
// Not all attributes matter to sequencer. We can leave these nil.
},
}
payloadRef := eth.L2BlockRef{
Hash: payloadEnvelope.ExecutionPayload.BlockHash,
Number: uint64(payloadEnvelope.ExecutionPayload.BlockNumber),
ParentHash: payloadEnvelope.ExecutionPayload.ParentHash,
Time: uint64(payloadEnvelope.ExecutionPayload.Timestamp),
L1Origin: l1Origin.ID(),
SequenceNumber: 0,
}
emitter.ExpectOnce(engine.PayloadProcessEvent{
IsLastInSpan: false,
DerivedFrom: eth.L1BlockRef{},
Envelope: payloadEnvelope,
Ref: payloadRef,
})
// And report back the sealing result to the engine
seq.OnEvent(engine.BuildSealedEvent{
IsLastInSpan: false,
DerivedFrom: eth.L1BlockRef{},
Info: payloadInfo,
Envelope: payloadEnvelope,
Ref: payloadRef,
})
// The sequencer should start processing the payload
emitter.AssertExpectations(t)
// But also optimistically give it to the conductor and the async gossip
require.Equal(t, payloadEnvelope, deps.conductor.committed, "must commit to conductor")
require.Equal(t, payloadEnvelope, deps.asyncGossip.payload, "must send to async gossip")
_, ok = seq.NextAction()
require.False(t, ok, "optimistically published, but not ready to sequence next, until local processing completes")

// Now we stop the block building,
// before successful local processing of the committed block!
stopHead, err := seq.Stop(context.Background())
require.NoError(t, err)
require.Equal(t, head.Hash, stopHead, "sequencer should not have accepted any new block yet")
require.False(t, deps.seqState.active, "sequencer signaled it is no longer active")

// Async-gossip will try to publish this committed block
require.NotNil(t, deps.asyncGossip.payload, "still holding on to async-gossip block")

// Now let's say another sequencer built a bunch of blocks,
// can we continue from there? We'll have to wipe the old in-flight block,
// if we continue on top of a chain that had it already included a while ago.

// Signal the new chain we are building on
testClock.Set(testClock.Now().Add(time.Second * 100 * 2))

newL1Origin := eth.L1BlockRef{
Hash: common.Hash{0x11, 0x11, 0x44},
ParentHash: head.L1Origin.Hash,
Number: head.L1Origin.Number + 50,
Time: uint64(testClock.Now().Unix()),
}
newHead := eth.L2BlockRef{
Hash: common.Hash{0x44},
Number: head.Number + 100,
L1Origin: newL1Origin.ID(),
Time: uint64(testClock.Now().Unix()),
}
seq.OnEvent(engine.ForkchoiceUpdateEvent{UnsafeL2Head: newHead})

// Regression check: async-gossip is cleared upon sequencer un-pause.
// We could clear it earlier. But absolutely have to clear it upon Start(),
// to not continue from this older point.
require.NotNil(t, deps.asyncGossip.payload, "async-gossip still not cleared")

// start sequencing on top of the new chain
require.NoError(t, seq.Start(context.Background(), newHead.Hash), "must continue from new block")

// regression check: no stale async gossip is continued
require.Nil(t, deps.asyncGossip.payload, "async gossip should be cleared on Start")

// Start building the block with the new L1 origin
deps.l1OriginSelector.l1OriginFn = func(l2Head eth.L2BlockRef) (eth.L1BlockRef, error) {
return newL1Origin, nil
}
// Sequencer action, assert we build on top of something new,
// and don't try to seal what was previously.
_, ok = seq.NextAction()
require.True(t, ok, "ready to sequence again")
// start, not seal, when continuing to sequence.
emitter.ExpectOnceRun(func(ev event.Event) {
buildEv, ok := ev.(engine.BuildStartEvent)
require.True(t, ok)
require.Equal(t, newHead, buildEv.Attributes.Parent, "build on the new L2 head")
})
seq.OnEvent(SequencerActionEvent{})
emitter.AssertExpectations(t)
}

func TestSequencerBuild(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
seq, deps := createSequencer(logger)
Expand Down

0 comments on commit 224c5fd

Please sign in to comment.