Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Span batch derivation [by Test In Prod, rebased on develop, see #7289] #7621

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,18 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
var batches []derive.SingularBatch
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(cfg, ch.Reader(), eth.L1BlockRef{})
br, err := derive.BatchReader(ch.Reader())
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.SingularBatch)
if batch.BatchType != derive.SingularBatchType {
batches = append(batches, batch.SingularBatch)
} else {
fmt.Printf("batch-type %d is not supported", batch.BatchType)
}
protolambda marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type AttributesQueue struct {
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *BatchData
batch *SingularBatch
}

func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2

// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *SingularBatch, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
Expand Down
6 changes: 3 additions & 3 deletions op-node/rollup/derive/attributes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestAttributesQueue(t *testing.T) {
safeHead.L1Origin = l1Info.ID()
safeHead.Time = l1Info.InfoTime

batch := NewSingularBatchData(SingularBatch{
batch := SingularBatch{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
EpochHash: l1Info.InfoHash,
Timestamp: safeHead.Time + cfg.BlockTime,
Transactions: []eth.Data{eth.Data("foobar"), eth.Data("example")},
})
}

parentL1Cfg := eth.SystemConfig{
BatcherAddr: common.Address{42},
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestAttributesQueue(t *testing.T) {

aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, attrBuilder, nil)

actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead)

require.NoError(t, err)
require.Equal(t, attrs, *actual)
Expand Down
146 changes: 100 additions & 46 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ import (

type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
NextBatch(ctx context.Context) (Batch, error)
}

type SafeBlockFetcher interface {
L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
}

// BatchQueue contains a set of batches for every L1 block.
Expand All @@ -42,24 +47,60 @@ type BatchQueue struct {

l1Blocks []eth.L1BlockRef

// batches in order of when we've first seen them, grouped by L2 timestamp
protolambda marked this conversation as resolved.
Show resolved Hide resolved
batches map[uint64][]*BatchWithL1InclusionBlock
// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock

// nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch

l2 SafeBlockFetcher
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
prev: prev,
l2: l2,
}
}

func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}

func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
// popNextBatch pops the next batch from the current queued up span-batch nextSpan.
// The queue must be non-empty, or the function will panic.
func (bq *BatchQueue) popNextBatch(safeL2Head eth.L2BlockRef) *SingularBatch {
protolambda marked this conversation as resolved.
Show resolved Hide resolved
protolambda marked this conversation as resolved.
Show resolved Hide resolved
if len(bq.nextSpan) == 0 {
panic("popping non-existent span-batch, invalid state")
}
nextBatch := bq.nextSpan[0]
bq.nextSpan = bq.nextSpan[1:]
// Must set ParentHash before return. we can use safeL2Head because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = safeL2Head.Hash
return nextBatch
}

func (bq *BatchQueue) maybeAdvanceEpoch(nextBatch *SingularBatch) {
if len(bq.l1Blocks) == 0 {
return
}
if nextBatch.GetEpochNum() == rollup.Epoch(bq.l1Blocks[0].Number)+1 {
// Advance epoch if necessary
bq.l1Blocks = bq.l1Blocks[1:]
}
}

func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*SingularBatch, error) {
if len(bq.nextSpan) > 0 {
// If there are cached singular batches, pop first one and return.
nextBatch := bq.popNextBatch(safeL2Head)
bq.maybeAdvanceEpoch(nextBatch)
return nextBatch, nil
}

// Note: We use the origin that we will have to determine if it's behind. This is important
// because it's the future origin that gets saved into the l1Blocks array.
// We always update the origin of this stage if it is not the same so after the update code
Expand Down Expand Up @@ -89,7 +130,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
bq.AddBatch(ctx, batch, safeL2Head)
}

// Skip adding data unless we are up to date with the origin, but do fully
Expand All @@ -111,43 +152,71 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef)
} else if err != nil {
return nil, err
}
return batch, nil

var nextBatch *SingularBatch
switch batch.GetBatchType() {
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If next batch is SpanBatch, convert it to SingularBatches.
singularBatches, err := spanBatch.GetSingularBatches(bq.l1Blocks, safeL2Head)
if err != nil {
return nil, NewCriticalError(err)
}
protolambda marked this conversation as resolved.
Show resolved Hide resolved
bq.nextSpan = singularBatches
// span-batches are non-empty, so the below pop is safe.
nextBatch = bq.popNextBatch(safeL2Head)
default:
return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
}

bq.maybeAdvanceEpoch(nextBatch)
return nextBatch, nil
}

func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
bq.batches = []*BatchWithL1InclusionBlock{}
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, base)
bq.nextSpan = bq.nextSpan[:0]
return io.EOF
}

func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
func (bq *BatchQueue) AddBatch(ctx context.Context, batch Batch, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.GetTimestamp()))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
validity := CheckBatch(ctx, bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data, bq.l2)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
bq.log.Debug("Adding batch", "batch_timestamp", batch.Timestamp, "parent_hash", batch.ParentHash, "batch_epoch", batch.Epoch(), "txs", len(batch.Transactions))
bq.batches[batch.Timestamp] = append(bq.batches[batch.Timestamp], &data)
batch.LogContext(bq.log).Debug("Adding batch")
bq.batches = append(bq.batches, &data)
}

// deriveNextBatch derives the next batch to apply on top of the current L2 safe head,
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (Batch, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
Expand All @@ -170,19 +239,15 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2Saf
// Go over all batches, in order of inclusion, and find the first batch we can accept.
// We filter in-place by only remembering the batches that may be processed in the future, or those we are undecided on.
var remaining []*BatchWithL1InclusionBlock
candidates := bq.batches[nextTimestamp]
batchLoop:
for i, batch := range candidates {
validity := CheckBatch(bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch)
for i, batch := range bq.batches {
validity := CheckBatch(ctx, bq.config, bq.log.New("batch_index", i), bq.l1Blocks, l2SafeHead, batch, bq.l2)
switch validity {
case BatchFuture:
return nil, NewCriticalError(fmt.Errorf("found batch with timestamp %d marked as future batch, but expected timestamp %d", batch.Batch.Timestamp, nextTimestamp))
remaining = append(remaining, batch)
continue
case BatchDrop:
bq.log.Warn("dropping batch",
"batch_timestamp", batch.Batch.Timestamp,
"parent_hash", batch.Batch.ParentHash,
"batch_epoch", batch.Batch.Epoch(),
"txs", len(batch.Batch.Transactions),
batch.Batch.LogContext(bq.log).Warn("Dropping batch",
"l2_safe_head", l2SafeHead.ID(),
"l2_safe_head_time", l2SafeHead.Time,
)
Expand All @@ -191,29 +256,20 @@ batchLoop:
nextBatch = batch
// don't keep the current batch in the remaining items since we are processing it now,
// but retain every batch we didn't get to yet.
remaining = append(remaining, candidates[i+1:]...)
remaining = append(remaining, bq.batches[i+1:]...)
break batchLoop
case BatchUndecided:
remaining = append(remaining, batch)
bq.batches[nextTimestamp] = remaining
remaining = append(remaining, bq.batches[i:]...)
bq.batches = remaining
return nil, io.EOF
default:
return nil, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
}
}
// clean up if we remove the final batch for this timestamp
if len(remaining) == 0 {
delete(bq.batches, nextTimestamp)
} else {
bq.batches[nextTimestamp] = remaining
}
bq.batches = remaining

if nextBatch != nil {
// advance epoch if necessary
if nextBatch.Batch.EpochNum == rollup.Epoch(epoch.Number)+1 {
bq.l1Blocks = bq.l1Blocks[1:]
}
bq.log.Info("Found next batch", "epoch", epoch, "batch_epoch", nextBatch.Batch.EpochNum, "batch_timestamp", nextBatch.Batch.Timestamp)
nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
return nextBatch.Batch, nil
}

Expand Down Expand Up @@ -243,15 +299,13 @@ batchLoop:
// batch to ensure that we at least have one batch per epoch.
if nextTimestamp < nextEpoch.Time || firstOfEpoch {
bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
return NewSingularBatchData(
SingularBatch{
ParentHash: l2SafeHead.Hash,
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: nextTimestamp,
Transactions: nil,
},
), nil
return &SingularBatch{
ParentHash: l2SafeHead.Hash,
EpochNum: rollup.Epoch(epoch.Number),
EpochHash: epoch.Hash,
Timestamp: nextTimestamp,
Transactions: nil,
}, nil
}

// At this point we have auto generated every batch for the current epoch
Expand Down
Loading