From 76e3118a9bc70b261b3c67d01fa9fcf7be3712de Mon Sep 17 00:00:00 2001 From: agnusmor <100322135+agnusmor@users.noreply.github.com> Date: Mon, 22 Jan 2024 23:01:39 +0100 Subject: [PATCH] Add L2 block sequential process. Execute empty L2 block. Add tracking number to L2 block. Several refactors (#3118) * add L2 block sequential process. Execute empty L2 block. Add tracking number to L2 block. Several refactors * fix finalizer UT --- config/default.go | 1 + .../environments/local/local.node.config.toml | 1 + docs/config-file/node-config-doc.html | 2 +- docs/config-file/node-config-doc.md | 45 ++- docs/config-file/node-config-schema.json | 7 +- sequencer/batch.go | 197 ++++++----- sequencer/config.go | 7 +- sequencer/finalizer.go | 162 ++------- sequencer/finalizer_test.go | 58 ++-- sequencer/l2block.go | 328 +++++++++++------- sequencer/worker.go | 4 +- sequencer/worker_test.go | 4 +- sequencesender/sequencesender.go | 2 +- state/batch.go | 24 +- state/errors.go | 19 - state/pgstatestorage/pgstatestorage_test.go | 1 - state/types.go | 39 +-- test/config/debug.node.config.toml | 1 + test/config/test.node.config.toml | 1 + 19 files changed, 440 insertions(+), 463 deletions(-) diff --git a/config/default.go b/config/default.go index 17f783febf..dd475bf313 100644 --- a/config/default.go +++ b/config/default.go @@ -136,6 +136,7 @@ StateConsistencyCheckInterval = "5s" ResourceExhaustedMarginPct = 10 HaltOnBatchNumber = 0 SequentialBatchSanityCheck = false + SequentialProcessL2Block = true [Sequencer.StreamServer] Port = 0 Filename = "" diff --git a/config/environments/local/local.node.config.toml b/config/environments/local/local.node.config.toml index f4d58af7c8..721df47b63 100644 --- a/config/environments/local/local.node.config.toml +++ b/config/environments/local/local.node.config.toml @@ -102,6 +102,7 @@ StateConsistencyCheckInterval = "5s" ResourceExhaustedMarginPct = 10 HaltOnBatchNumber = 0 SequentialBatchSanityCheck = false + SequentialProcessL2Block = true [Sequencer.StreamServer] Port = 0 Filename = "" diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index ba823a2bc7..959c636b8c 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -50,7 +50,7 @@
"300ms"
 

Default: "3s"Type: string

L2BlockMaxDeltaTimestamp is the resolution of the timestamp used to close a L2 block


Examples:

"1m"
 
"300ms"
-

Default: 0Type: integer

HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. The Sequencer will halt after it closes the batch equal to this number


Default: falseType: boolean

SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
+

Default: 0Type: integer

HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number


Default: falseType: boolean

SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)


Default: trueType: boolean

SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
 
"300ms"
 

Default: "5s"Type: string

LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index bc82e72f61..56341f187b 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -1729,19 +1729,20 @@ StateConsistencyCheckInterval="5s"
 **Type:** : `object`
 **Description:** Finalizer's specific config properties
 
-| Property                                                                                       | Pattern | Type    | Deprecated | Definition | Title/Description                                                                                                                                                                                        |
-| ---------------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| - [ForcedBatchesTimeout](#Sequencer_Finalizer_ForcedBatchesTimeout )                           | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [NewTxsWaitInterval](#Sequencer_Finalizer_NewTxsWaitInterval )                               | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [ResourceExhaustedMarginPct](#Sequencer_Finalizer_ResourceExhaustedMarginPct )               | No      | integer | No         | -          | ResourceExhaustedMarginPct is the percentage window of the resource left out for the batch to be closed                                                                                                  |
-| - [ForcedBatchesL1BlockConfirmations](#Sequencer_Finalizer_ForcedBatchesL1BlockConfirmations ) | No      | integer | No         | -          | ForcedBatchesL1BlockConfirmations is number of blocks to consider GER final                                                                                                                              |
-| - [L1InfoTreeL1BlockConfirmations](#Sequencer_Finalizer_L1InfoTreeL1BlockConfirmations )       | No      | integer | No         | -          | L1InfoTreeL1BlockConfirmations is number of blocks to consider L1InfoRoot final                                                                                                                          |
-| - [ForcedBatchesCheckInterval](#Sequencer_Finalizer_ForcedBatchesCheckInterval )               | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [L1InfoTreeCheckInterval](#Sequencer_Finalizer_L1InfoTreeCheckInterval )                     | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [BatchMaxDeltaTimestamp](#Sequencer_Finalizer_BatchMaxDeltaTimestamp )                       | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [L2BlockMaxDeltaTimestamp](#Sequencer_Finalizer_L2BlockMaxDeltaTimestamp )                   | No      | string  | No         | -          | Duration                                                                                                                                                                                                 |
-| - [HaltOnBatchNumber](#Sequencer_Finalizer_HaltOnBatchNumber )                                 | No      | integer | No         | -          | HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. The Sequencer will halt after it closes the batch equal to this number |
-| - [SequentialBatchSanityCheck](#Sequencer_Finalizer_SequentialBatchSanityCheck )               | No      | boolean | No         | -          | SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel) | +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| ---------------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| - [ForcedBatchesTimeout](#Sequencer_Finalizer_ForcedBatchesTimeout ) | No | string | No | - | Duration | +| - [NewTxsWaitInterval](#Sequencer_Finalizer_NewTxsWaitInterval ) | No | string | No | - | Duration | +| - [ResourceExhaustedMarginPct](#Sequencer_Finalizer_ResourceExhaustedMarginPct ) | No | integer | No | - | ResourceExhaustedMarginPct is the percentage window of the resource left out for the batch to be closed | +| - [ForcedBatchesL1BlockConfirmations](#Sequencer_Finalizer_ForcedBatchesL1BlockConfirmations ) | No | integer | No | - | ForcedBatchesL1BlockConfirmations is number of blocks to consider GER final | +| - [L1InfoTreeL1BlockConfirmations](#Sequencer_Finalizer_L1InfoTreeL1BlockConfirmations ) | No | integer | No | - | L1InfoTreeL1BlockConfirmations is number of blocks to consider L1InfoRoot final | +| - [ForcedBatchesCheckInterval](#Sequencer_Finalizer_ForcedBatchesCheckInterval ) | No | string | No | - | Duration | +| - [L1InfoTreeCheckInterval](#Sequencer_Finalizer_L1InfoTreeCheckInterval ) | No | string | No | - | Duration | +| - [BatchMaxDeltaTimestamp](#Sequencer_Finalizer_BatchMaxDeltaTimestamp ) | No | string | No | - | Duration | +| - [L2BlockMaxDeltaTimestamp](#Sequencer_Finalizer_L2BlockMaxDeltaTimestamp ) | No | string | No | - | Duration | +| - [HaltOnBatchNumber](#Sequencer_Finalizer_HaltOnBatchNumber ) | No | integer | No | - | HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number | +| - [SequentialBatchSanityCheck](#Sequencer_Finalizer_SequentialBatchSanityCheck ) | No | boolean | No | - | SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel) | +| - [SequentialProcessL2Block](#Sequencer_Finalizer_SequentialProcessL2Block ) | No | boolean | No | - | SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func | #### 10.7.1. `Sequencer.Finalizer.ForcedBatchesTimeout` @@ -1947,7 +1948,8 @@ L2BlockMaxDeltaTimestamp="3s" **Default:** `0` -**Description:** HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. The Sequencer will halt after it closes the batch equal to this number +**Description:** HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. +The Sequencer will halt after it closes the batch equal to this number **Example setting the default value** (0): ``` @@ -1970,6 +1972,21 @@ sequential way (instead than in parallel) SequentialBatchSanityCheck=false ``` +#### 10.7.12. `Sequencer.Finalizer.SequentialProcessL2Block` + +**Type:** : `boolean` + +**Default:** `true` + +**Description:** SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead +in the processPendingL2Blocks go func + +**Example setting the default value** (true): +``` +[Sequencer.Finalizer] +SequentialProcessL2Block=true +``` + ### 10.8. `[Sequencer.StreamServer]` **Type:** : `object` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 0659b5315b..3a423746b9 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -727,13 +727,18 @@ }, "HaltOnBatchNumber": { "type": "integer", - "description": "HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. The Sequencer will halt after it closes the batch equal to this number", + "description": "HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.\nThe Sequencer will halt after it closes the batch equal to this number", "default": 0 }, "SequentialBatchSanityCheck": { "type": "boolean", "description": "SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a\nsequential way (instead than in parallel)", "default": false + }, + "SequentialProcessL2Block": { + "type": "boolean", + "description": "SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead\nin the processPendingL2Blocks go func", + "default": true } }, "additionalProperties": false, diff --git a/sequencer/batch.go b/sequencer/batch.go index 2e6938aebe..3c007209c2 100644 --- a/sequencer/batch.go +++ b/sequencer/batch.go @@ -16,16 +16,17 @@ import ( // Batch represents a wip or processed batch. type Batch struct { - batchNumber uint64 - coinbase common.Address - timestamp time.Time - initialStateRoot common.Hash // initial stateRoot of the batch - imStateRoot common.Hash // intermediate stateRoot that is updated each time a single tx is processed - finalStateRoot common.Hash // final stateroot of the batch when a L2 block is processed - countOfTxs int - countOfL2Blocks int - remainingResources state.BatchResources - closingReason state.ClosingReason + batchNumber uint64 + coinbase common.Address + timestamp time.Time + initialStateRoot common.Hash // initial stateRoot of the batch + imStateRoot common.Hash // intermediate stateRoot when processing tx-by-tx + finalStateRoot common.Hash // final stateroot of the batch when a L2 block is processed + countOfTxs int + countOfL2Blocks int + imRemainingResources state.BatchResources // remaining batch resources when processing tx-by-tx + finalRemainingResources state.BatchResources // remaining batch resources when a L2 block is processed + closingReason state.ClosingReason } func (w *Batch) isEmpty() bool { @@ -52,20 +53,21 @@ func (f *finalizer) setWIPBatch(ctx context.Context, wipStateBatch *state.Batch) } remainingResources := getMaxRemainingResources(f.batchConstraints) - err = remainingResources.Sub(wipStateBatch.Resources) - if err != nil { - return nil, err + overflow, overflowResource := remainingResources.Sub(wipStateBatch.Resources) + if overflow { + return nil, fmt.Errorf("failed to subtract used resources when setting the WIP batch to the state batch %d, overflow resource: %s", wipStateBatch.BatchNumber, overflowResource) } wipBatch := &Batch{ - batchNumber: wipStateBatch.BatchNumber, - coinbase: wipStateBatch.Coinbase, - imStateRoot: wipStateBatch.StateRoot, - initialStateRoot: prevStateBatch.StateRoot, - finalStateRoot: wipStateBatch.StateRoot, - timestamp: wipStateBatch.Timestamp, - countOfTxs: wipStateBatchCountOfTxs, - remainingResources: remainingResources, + batchNumber: wipStateBatch.BatchNumber, + coinbase: wipStateBatch.Coinbase, + imStateRoot: wipStateBatch.StateRoot, + initialStateRoot: prevStateBatch.StateRoot, + finalStateRoot: wipStateBatch.StateRoot, + timestamp: wipStateBatch.Timestamp, + countOfTxs: wipStateBatchCountOfTxs, + imRemainingResources: remainingResources, + finalRemainingResources: remainingResources, } return wipBatch, nil @@ -113,8 +115,8 @@ func (f *finalizer) initWIPBatch(ctx context.Context) { f.wipBatch.batchNumber, f.wipBatch.initialStateRoot, f.wipBatch.finalStateRoot, f.wipBatch.coinbase) } -// finalizeBatch retries until successful closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch -func (f *finalizer) finalizeBatch(ctx context.Context) { +// finalizeWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch +func (f *finalizer) finalizeWIPBatch(ctx context.Context, closeReason state.ClosingReason) { start := time.Now() defer func() { metrics.ProcessingTime(time.Since(start)) @@ -128,7 +130,7 @@ func (f *finalizer) finalizeBatch(ctx context.Context) { f.closeWIPL2Block(ctx) } - err := f.closeAndOpenNewWIPBatch(ctx) + err := f.closeAndOpenNewWIPBatch(ctx, closeReason) if err != nil { f.Halt(ctx, fmt.Errorf("failed to create new WIP batch, error: %v", err)) } @@ -140,7 +142,7 @@ func (f *finalizer) finalizeBatch(ctx context.Context) { } // closeAndOpenNewWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch -func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) error { +func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason state.ClosingReason) error { // Wait until all L2 blocks are processed by the executor startWait := time.Now() f.pendingL2BlocksToProcessWG.Wait() @@ -153,7 +155,16 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) error { f.pendingL2BlocksToStoreWG.Wait() log.Debugf("waiting for pending L2 blocks to be stored took: %v", time.Since(startWait)) + f.wipBatch.closingReason = closeReason + + // Close the wip batch var err error + err = f.closeWIPBatch(ctx) + if err != nil { + return fmt.Errorf("failed to close batch, error: %v", err) + } + + log.Infof("batch %d closed, closing reason: %s", f.wipBatch.batchNumber, closeReason) // Reprocess full batch as sanity check if f.cfg.SequentialBatchSanityCheck { @@ -170,14 +181,6 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) error { }() } - // Close the wip batch - err = f.closeWIPBatch(ctx) - if err != nil { - return fmt.Errorf("failed to close batch, error: %v", err) - } - - log.Infof("batch %d closed", f.wipBatch.batchNumber) - if f.wipBatch.batchNumber+1 == f.cfg.HaltOnBatchNumber { f.Halt(ctx, fmt.Errorf("finalizer reached stop sequencer on batch number: %d", f.cfg.HaltOnBatchNumber)) } @@ -189,7 +192,7 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) error { // Process forced batches if len(f.nextForcedBatches) > 0 { lastBatchNumber, stateRoot = f.processForcedBatches(ctx, lastBatchNumber, stateRoot) - // We must init/reset the wip L2 block from the state since processForcedBatches can create new L2 blocks + // We must init/reset the wip L2 block from the state since processForcedBatches can created new L2 blocks f.initWIPL2Block(ctx) } @@ -199,10 +202,11 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) error { } if f.wipL2Block != nil { - // Sustract the WIP L2 block used resources to batch - err = batch.remainingResources.Sub(f.wipL2Block.getUsedResources()) - if err != nil { - return fmt.Errorf("failed to subtract L2 block used resources to new wip batch %d, error: %v", batch.batchNumber, err) + // Subtract the WIP L2 block used resources to batch + overflow, overflowResource := batch.imRemainingResources.Sub(f.wipL2Block.usedResources) + if overflow { + return fmt.Errorf("failed to subtract L2 block [%d] used resources to new wip batch %d, overflow resource: %s", + f.wipL2Block.trackingNum, batch.batchNumber, overflowResource) } } @@ -252,21 +256,24 @@ func (f *finalizer) openNewWIPBatch(ctx context.Context, batchNumber uint64, sta time.Sleep(time.Second) } + maxRemainingResources := getMaxRemainingResources(f.batchConstraints) + return &Batch{ - batchNumber: newStateBatch.BatchNumber, - coinbase: newStateBatch.Coinbase, - initialStateRoot: newStateBatch.StateRoot, - imStateRoot: newStateBatch.StateRoot, - finalStateRoot: newStateBatch.StateRoot, - timestamp: newStateBatch.Timestamp, - remainingResources: getMaxRemainingResources(f.batchConstraints), - closingReason: state.EmptyClosingReason, + batchNumber: newStateBatch.BatchNumber, + coinbase: newStateBatch.Coinbase, + initialStateRoot: newStateBatch.StateRoot, + imStateRoot: newStateBatch.StateRoot, + finalStateRoot: newStateBatch.StateRoot, + timestamp: newStateBatch.Timestamp, + imRemainingResources: maxRemainingResources, + finalRemainingResources: maxRemainingResources, + closingReason: state.EmptyClosingReason, }, err } // closeWIPBatch closes the current batch in the state func (f *finalizer) closeWIPBatch(ctx context.Context) error { - usedResources := getUsedBatchResources(f.batchConstraints, f.wipBatch.remainingResources) + usedResources := getUsedBatchResources(f.batchConstraints, f.wipBatch.imRemainingResources) receipt := state.ProcessingReceipt{ BatchNumber: f.wipBatch.batchNumber, BatchResources: usedResources, @@ -296,16 +303,6 @@ func (f *finalizer) closeWIPBatch(ctx context.Context) error { return nil } -// maxTxsPerBatchReached checks if the batch has reached the maximum number of txs per batch -func (f *finalizer) maxTxsPerBatchReached() bool { - if f.wipBatch.countOfTxs >= int(f.batchConstraints.MaxTxsPerBatch) { - log.Infof("closing batch %d, because it reached the maximum number of txs", f.wipBatch.batchNumber) - f.wipBatch.closingReason = state.BatchFullClosingReason - return true - } - return false -} - // batchSanityCheck reprocesses a batch used as sanity check func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initialStateRoot common.Hash, expectedNewStateRoot common.Hash) (*state.ProcessBatchResponse, error) { reprocessError := func(batch *state.Batch) { @@ -407,71 +404,53 @@ func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initi return nil, ErrStateRootNoMatch } - log.Infof("successful sanity check for batch %d, initialStateRoot: %s, stateRoot: %s, l2Blocks: %d, time: %v, %s", + log.Infof("successful sanity check for batch %d, initialStateRoot: %s, stateRoot: %s, l2Blocks: %d, time: %v, used counters: %s", batch.BatchNumber, initialStateRoot, batchResponse.NewStateRoot.String(), len(batchResponse.BlockResponses), endProcessing.Sub(startProcessing), f.logZKCounters(batchResponse.UsedZkCounters)) return batchResponse, nil } -// checkRemainingResources checks if the resources passed as parameters fits in the wip batch. -func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse, bytes uint64) error { - usedResources := state.BatchResources{ - ZKCounters: result.UsedZkCounters, - Bytes: bytes, - } - - return f.wipBatch.remainingResources.Sub(usedResources) -} - -// logZKCounters returns a string with all the zkCounters values -func (f *finalizer) logZKCounters(counters state.ZKCounters) string { - return fmt.Sprintf("gasUsed: %d, keccakHashes: %d, poseidonHashes: %d, poseidonPaddings: %d, memAligns: %d, arithmetics: %d, binaries: %d, sha256Hashes: %d, steps: %d", - counters.GasUsed, counters.UsedKeccakHashes, counters.UsedPoseidonHashes, counters.UsedPoseidonPaddings, counters.UsedMemAligns, counters.UsedArithmetics, - counters.UsedBinaries, counters.UsedSha256Hashes_V2, counters.UsedSteps) +// maxTxsPerBatchReached checks if the batch has reached the maximum number of txs per batch +func (f *finalizer) maxTxsPerBatchReached(batch *Batch) bool { + return (f.batchConstraints.MaxTxsPerBatch != 0) && (batch.countOfTxs >= int(f.batchConstraints.MaxTxsPerBatch)) } -// isBatchResourcesExhausted checks if one of resources of the wip batch has reached the max value -func (f *finalizer) isBatchResourcesExhausted() bool { - resources := f.wipBatch.remainingResources +// isBatchResourcesMarginExhausted checks if one of resources of the batch has reached the exhausted margin and returns the name of the exhausted resource +func (f *finalizer) isBatchResourcesMarginExhausted(resources state.BatchResources) (bool, string) { zkCounters := resources.ZKCounters result := false - resourceDesc := "" + resourceName := "" if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) { - resourceDesc = "MaxBatchBytesSize" + resourceName = "Bytes" result = true } else if zkCounters.UsedSteps <= f.getConstraintThresholdUint32(f.batchConstraints.MaxSteps) { - resourceDesc = "MaxSteps" + resourceName = "Steps" result = true } else if zkCounters.UsedPoseidonPaddings <= f.getConstraintThresholdUint32(f.batchConstraints.MaxPoseidonPaddings) { - resourceDesc = "MaxPoseidonPaddings" + resourceName = "PoseidonPaddings" result = true } else if zkCounters.UsedBinaries <= f.getConstraintThresholdUint32(f.batchConstraints.MaxBinaries) { - resourceDesc = "MaxBinaries" + resourceName = "Binaries" result = true } else if zkCounters.UsedKeccakHashes <= f.getConstraintThresholdUint32(f.batchConstraints.MaxKeccakHashes) { - resourceDesc = "MaxKeccakHashes" + resourceName = "KeccakHashes" result = true } else if zkCounters.UsedArithmetics <= f.getConstraintThresholdUint32(f.batchConstraints.MaxArithmetics) { - resourceDesc = "MaxArithmetics" + resourceName = "Arithmetics" result = true } else if zkCounters.UsedMemAligns <= f.getConstraintThresholdUint32(f.batchConstraints.MaxMemAligns) { - resourceDesc = "MaxMemAligns" + resourceName = "MemAligns" result = true } else if zkCounters.GasUsed <= f.getConstraintThresholdUint64(f.batchConstraints.MaxCumulativeGasUsed) { - resourceDesc = "MaxCumulativeGasUsed" + resourceName = "CumulativeGas" result = true } else if zkCounters.UsedSha256Hashes_V2 <= f.getConstraintThresholdUint32(f.batchConstraints.MaxSHA256Hashes) { - resourceDesc = "MaxSHA256Hashes" + resourceName = "SHA256Hashes" result = true } - if result { - log.Infof("closing batch %d because it reached %s limit", f.wipBatch.batchNumber, resourceDesc) - f.wipBatch.closingReason = state.BatchAlmostFullClosingReason - } - - return result + return result, resourceName } // getConstraintThresholdUint64 returns the threshold for the given input @@ -484,7 +463,7 @@ func (f *finalizer) getConstraintThresholdUint32(input uint32) uint32 { return input * f.cfg.ResourceExhaustedMarginPct / 100 //nolint:gomnd } -// getUsedBatchResources returns the max resources that can be used in a batch +// getUsedBatchResources calculates and returns the used resources of a batch from remaining resources func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResources state.BatchResources) state.BatchResources { return state.BatchResources{ ZKCounters: state.ZKCounters{ @@ -502,7 +481,7 @@ func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResou } } -// getMaxRemainingResources returns the max zkcounters that can be used in a batch +// getMaxRemainingResources returns the max resources that can be used in a batch func getMaxRemainingResources(constraints state.BatchConstraintsCfg) state.BatchResources { return state.BatchResources{ ZKCounters: state.ZKCounters{ @@ -519,3 +498,33 @@ func getMaxRemainingResources(constraints state.BatchConstraintsCfg) state.Batch Bytes: constraints.MaxBatchBytesSize, } } + +// checkIfFinalizeBatch returns true if the batch must be closed due to a closing reason, also it returns the description of the close reason +func (f *finalizer) checkIfFinalizeBatch() (bool, state.ClosingReason) { + // Max txs per batch + if f.maxTxsPerBatchReached(f.wipBatch) { + log.Infof("closing batch %d, because it reached the maximum number of txs", f.wipBatch.batchNumber) + return true, state.MaxTxsClosingReason + } + + // Batch resource (zkCounters or batch bytes) margin exhausted + exhausted, resourceDesc := f.isBatchResourcesMarginExhausted(f.wipBatch.imRemainingResources) + if exhausted { + log.Infof("closing batch %d because it exhausted margin for %s batch resource", f.wipBatch.batchNumber, resourceDesc) + return true, state.ResourceMarginExhaustedClosingReason + } + + // Forced batch deadline + if f.nextForcedBatchDeadline != 0 && now().Unix() >= f.nextForcedBatchDeadline { + log.Infof("closing batch %d, forced batch deadline encountered", f.wipBatch.batchNumber) + return true, state.ForcedBatchDeadlineClosingReason + } + + // Batch timestamp resolution + if !f.wipBatch.isEmpty() && f.wipBatch.timestamp.Add(f.cfg.BatchMaxDeltaTimestamp.Duration).Before(time.Now()) { + log.Infof("closing batch %d, because of batch max delta timestamp reached", f.wipBatch.batchNumber) + return true, state.MaxDeltaTimestampClosingReason + } + + return false, "" +} diff --git a/sequencer/config.go b/sequencer/config.go index bd1234c3de..b560d4aa98 100644 --- a/sequencer/config.go +++ b/sequencer/config.go @@ -77,10 +77,15 @@ type FinalizerCfg struct { // L2BlockMaxDeltaTimestamp is the resolution of the timestamp used to close a L2 block L2BlockMaxDeltaTimestamp types.Duration `mapstructure:"L2BlockMaxDeltaTimestamp"` - // HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. The Sequencer will halt after it closes the batch equal to this number + // HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches. + // The Sequencer will halt after it closes the batch equal to this number HaltOnBatchNumber uint64 `mapstructure:"HaltOnBatchNumber"` // SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a // sequential way (instead than in parallel) SequentialBatchSanityCheck bool `mapstructure:"SequentialBatchSanityCheck"` + + // SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead + // in the processPendingL2Blocks go func + SequentialProcessL2Block bool `mapstructure:"SequentialProcessL2Block"` } diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 723588b52c..082febc089 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -30,26 +30,6 @@ const ( var ( now = time.Now mockL1InfoRoot = common.Hash{} - - //TODO: Review with Carlos which zkCounters are used when creating a new l2 block in the wip batch - l2BlockUsedResourcesIndexZero = state.BatchResources{ - ZKCounters: state.ZKCounters{ - UsedPoseidonHashes: 256, // nolint:gomnd //TODO: config param - UsedBinaries: 19, // nolint:gomnd //TODO: config param - UsedSteps: 275, // nolint:gomnd //TODO: config param - UsedKeccakHashes: 4, // nolint:gomnd //TODO: config param - }, - Bytes: changeL2BlockSize, - } - l2BlockUsedResourcesIndexNonZero = state.BatchResources{ - ZKCounters: state.ZKCounters{ - UsedPoseidonHashes: 256, // nolint:gomnd //TODO: config param - UsedBinaries: 23, // nolint:gomnd //TODO: config param - UsedSteps: 521, // nolint:gomnd //TODO: config param - UsedKeccakHashes: 38, // nolint:gomnd //TODO: config param - }, - Bytes: changeL2BlockSize, - } ) // finalizer represents the finalizer component of the sequencer. @@ -79,13 +59,15 @@ type finalizer struct { eventLog *event.EventLog // effective gas price calculation instance effectiveGasPrice *pool.EffectiveGasPrice - // pending L2 blocks to be processed (executor) + // pending L2 blocks to process (executor) pendingL2BlocksToProcess chan *L2Block pendingL2BlocksToProcessWG *sync.WaitGroup // pending L2 blocks to store in the state pendingL2BlocksToStore chan *L2Block pendingL2BlocksToStoreWG *sync.WaitGroup - // executer flushid control + // L2 block counter for tracking purposes + l2BlockCounter uint64 + // executor flushid control proverID string storedFlushID uint64 storedFlushIDCond *sync.Cond //Condition to wait until storedFlushID has been updated @@ -132,14 +114,14 @@ func newFinalizer( eventLog: eventLog, // effective gas price calculation instance effectiveGasPrice: pool.NewEffectiveGasPrice(poolCfg.EffectiveGasPrice), - // pending L2 blocks to be processed (executor) + // pending L2 blocks to process (executor) pendingL2BlocksToProcess: make(chan *L2Block, pendingL2BlocksBufferSize), pendingL2BlocksToProcessWG: new(sync.WaitGroup), // pending L2 blocks to store in the state pendingL2BlocksToStore: make(chan *L2Block, pendingL2BlocksBufferSize), pendingL2BlocksToStoreWG: new(sync.WaitGroup), storedFlushID: 0, - // executer flushid control + // executor flushid control proverID: "", storedFlushIDCond: sync.NewCond(&sync.Mutex{}), lastPendingFlushID: 0, @@ -290,11 +272,11 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { f.finalizeWIPL2Block(ctx) } - tx, err := f.workerIntf.GetBestFittingTx(f.wipBatch.remainingResources) + tx, err := f.workerIntf.GetBestFittingTx(f.wipBatch.imRemainingResources) // If we have txs pending to process but none of them fits into the wip batch, we close the wip batch and open a new one - if err == ErrNoFittingTransaction { //TODO: review this with JEC - f.finalizeBatch(ctx) + if err == ErrNoFittingTransaction { + f.finalizeWIPBatch(ctx, state.NoTxFitsClosingReason) } metrics.WorkerProcessingTime(time.Since(start)) @@ -336,10 +318,9 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { } } - if f.isDeadlineEncountered() { - f.finalizeBatch(ctx) - } else if f.maxTxsPerBatchReached() || f.isBatchResourcesExhausted() { - f.finalizeBatch(ctx) + // Check if we must finalize the batch due to a closing reason (resources exhausted, max txs, timestamp resolution, forced batches deadline) + if finalize, closeReason := f.checkIfFinalizeBatch(); finalize { + f.finalizeWIPBatch(ctx, closeReason) } if err := ctx.Err(); err != nil { @@ -364,27 +345,13 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first TimestampLimit_V2: uint64(f.wipL2Block.timestamp.Unix()), Caller: stateMetrics.SequencerCallerLabel, ForkID: f.stateIntf.GetForkIDByBatchNumber(f.wipBatch.batchNumber), + Transactions: tx.RawTx, + SkipFirstChangeL2Block_V2: true, SkipWriteBlockInfoRoot_V2: true, SkipVerifyL1InfoRoot_V2: true, L1InfoTreeData_V2: map[uint32]state.L1DataV2{}, } - batchRequest.L1InfoTreeData_V2[f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex] = state.L1DataV2{ - GlobalExitRoot: f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.GlobalExitRoot, - BlockHashL1: f.wipL2Block.l1InfoTreeExitRoot.PreviousBlockHash, - MinTimestamp: uint64(f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.Timestamp.Unix()), - } - - if f.wipL2Block.isEmpty() { - batchRequest.Transactions = f.stateIntf.BuildChangeL2Block(f.wipL2Block.deltaTimestamp, f.wipL2Block.getL1InfoTreeIndex()) - batchRequest.SkipFirstChangeL2Block_V2 = false - } else { - batchRequest.Transactions = []byte{} - batchRequest.SkipFirstChangeL2Block_V2 = true - } - - batchRequest.Transactions = append(batchRequest.Transactions, tx.RawTx...) - txGasPrice := tx.GasPrice // If it is the first time we process this tx then we calculate the EffectiveGasPrice @@ -493,10 +460,10 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first } } - // Update wip batch + // Update imStateRoot f.wipBatch.imStateRoot = batchResponse.NewStateRoot - log.Infof("processed tx %s. Batch.batchNumber: %d, batchNumber: %d, newStateRoot: %s, oldStateRoot: %s, %s", + log.Infof("processed tx %s. Batch.batchNumber: %d, batchNumber: %d, newStateRoot: %s, oldStateRoot: %s, used counters: %s", tx.HashStr, f.wipBatch.batchNumber, batchRequest.BatchNumber, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters)) @@ -513,11 +480,10 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx return errWg, result.BlockResponses[0].TransactionResponses[0].RomError } - //TODO: if it's the first tx in the wipL2Block. We must add the estimated l2block used resources to the batch and sustract the counters returned by the executor (it includes the real counters used by the changeL2block tx) // Check remaining resources - err = f.checkRemainingResources(result, uint64(len(tx.RawTx))) - if err != nil { - log.Infof("current tx %s exceeds the remaining batch resources, updating metadata for tx in worker and continuing", tx.HashStr) + overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: result.UsedZkCounters, Bytes: uint64(len(tx.RawTx))}) + if overflow { + log.Infof("current tx %s exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing", tx.HashStr, overflowResource) start := time.Now() f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters) metrics.WorkerProcessingTime(time.Since(start)) @@ -586,73 +552,6 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx return nil, nil } -// processEmptyL2Block processes an empty L2 block to update imStateRoot and remaining resources on batch -func (f *finalizer) processEmptyL2Block(ctx context.Context) error { - start := time.Now() - defer func() { - metrics.ProcessingTime(time.Since(start)) - }() - - batchRequest := state.ProcessRequest{ - BatchNumber: f.wipBatch.batchNumber, - OldStateRoot: f.wipBatch.imStateRoot, - Coinbase: f.wipBatch.coinbase, - L1InfoRoot_V2: mockL1InfoRoot, - TimestampLimit_V2: uint64(f.wipL2Block.timestamp.Unix()), - Caller: stateMetrics.SequencerCallerLabel, - ForkID: f.stateIntf.GetForkIDByBatchNumber(f.wipBatch.batchNumber), - SkipWriteBlockInfoRoot_V2: true, - SkipVerifyL1InfoRoot_V2: true, - SkipFirstChangeL2Block_V2: false, - Transactions: f.stateIntf.BuildChangeL2Block(f.wipL2Block.deltaTimestamp, f.wipL2Block.getL1InfoTreeIndex()), - L1InfoTreeData_V2: map[uint32]state.L1DataV2{}, - } - - batchRequest.L1InfoTreeData_V2[f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex] = state.L1DataV2{ - GlobalExitRoot: f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.GlobalExitRoot, - BlockHashL1: f.wipL2Block.l1InfoTreeExitRoot.PreviousBlockHash, - MinTimestamp: uint64(f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.Timestamp.Unix()), - } - - log.Infof("processing empty l2 block, wipBatch.BatchNumber: %d, batchNumber: %d, oldStateRoot: %s, L1InfoRootIndex: %d", - f.wipBatch.batchNumber, batchRequest.BatchNumber, batchRequest.OldStateRoot, f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex) - - batchResponse, err := f.stateIntf.ProcessBatchV2(ctx, batchRequest, false) - - if err != nil { - return err - } - - if batchResponse.ExecutorError != nil { - return ErrExecutorError - } - - if batchResponse.IsRomOOCError { - return ErrProcessBatchOOC - } - - if len(batchResponse.BlockResponses) == 0 { - return fmt.Errorf("BlockResponses returned by the executor is empty") - } - - //TODO: review this. We must add the estimated l2block used resources to the batch and sustract the counters returned by the executor - // Check remaining resources - // err = f.checkRemainingResources(batchResponse, changeL2BlockSize) - // if err != nil { - // log.Errorf("empty L2 block exceeds the remaining batch resources, error: %v", err) - // return err - // } - - // Update wip batch - f.wipBatch.imStateRoot = batchResponse.NewStateRoot - - log.Infof("processed empty L2 block %d, batch.batchNumber: %d, batchNumber: %d, newStateRoot: %s, oldStateRoot: %s, %s", - batchResponse.BlockResponses[0].BlockNumber, f.wipBatch.batchNumber, batchRequest.BatchNumber, batchResponse.NewStateRoot.String(), - batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters)) - - return nil -} - // compareTxEffectiveGasPrice compares newEffectiveGasPrice with tx.EffectiveGasPrice. // It returns ErrEffectiveGasPriceReprocess if the tx needs to be reprocessed with // the tx.EffectiveGasPrice updated, otherwise it returns nil @@ -791,22 +690,6 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s return wg } -// isDeadlineEncountered returns true if any closing signal deadline is encountered -func (f *finalizer) isDeadlineEncountered() bool { - // Forced batch deadline - if f.nextForcedBatchDeadline != 0 && now().Unix() >= f.nextForcedBatchDeadline { - log.Infof("closing batch %d, forced batch deadline encountered", f.wipBatch.batchNumber) - return true - } - // Timestamp resolution deadline - if !f.wipBatch.isEmpty() && f.wipBatch.timestamp.Add(f.cfg.BatchMaxDeltaTimestamp.Duration).Before(time.Now()) { - log.Infof("closing batch %d, because of batch max delta timestamp reached", f.wipBatch.batchNumber) - f.wipBatch.closingReason = state.TimeoutResolutionDeadlineClosingReason - return true - } - return false -} - // checkIfProverRestarted checks if the proverID changed func (f *finalizer) checkIfProverRestarted(proverID string) { if f.proverID != "" && f.proverID != proverID { @@ -828,6 +711,13 @@ func (f *finalizer) checkIfProverRestarted(proverID string) { } } +// logZKCounters returns a string with all the zkCounters values +func (f *finalizer) logZKCounters(counters state.ZKCounters) string { + return fmt.Sprintf("{gasUsed: %d, keccakHashes: %d, poseidonHashes: %d, poseidonPaddings: %d, memAligns: %d, arithmetics: %d, binaries: %d, sha256Hashes: %d, steps: %d}", + counters.GasUsed, counters.UsedKeccakHashes, counters.UsedPoseidonHashes, counters.UsedPoseidonPaddings, counters.UsedMemAligns, counters.UsedArithmetics, + counters.UsedBinaries, counters.UsedSha256Hashes_V2, counters.UsedSteps) +} + // Halt halts the finalizer func (f *finalizer) Halt(ctx context.Context, err error) { f.haltFinalizer.Store(true) diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 7f3499cb50..7c33c2f54e 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -96,7 +96,7 @@ var ( testErrStr = "some err" // testErr = fmt.Errorf(testErrStr) // openBatchError = fmt.Errorf("failed to open new batch, err: %v", testErr) - cumulativeGasErr = state.GetZKCounterError("CumulativeGasUsed") + // cumulativeGasErr = state.GetZKCounterError("CumulativeGasUsed") testBatchL2DataAsString = "0xee80843b9aca00830186a0944d5cf5032b2a844602278b01199ed191a86c93ff88016345785d8a0000808203e980801186622d03b6b8da7cf111d1ccba5bb185c56deae6a322cebc6dda0556f3cb9700910c26408b64b51c5da36ba2f38ef55ba1cee719d5a6c012259687999074321bff" decodedBatchL2Data []byte // done chan bool @@ -945,7 +945,7 @@ func TestNewFinalizer(t *testing.T) { func TestFinalizer_closeWIPBatch(t *testing.T) { // arrange f = setupFinalizer(true) - usedResources := getUsedBatchResources(f.batchConstraints, f.wipBatch.remainingResources) + usedResources := getUsedBatchResources(f.batchConstraints, f.wipBatch.imRemainingResources) receipt := state.ProcessingReceipt{ BatchNumber: f.wipBatch.batchNumber, @@ -1053,7 +1053,7 @@ func TestFinalizer_isDeadlineEncountered(t *testing.T) { } // act - actual := f.isDeadlineEncountered() + actual, _ := f.checkIfFinalizeBatch() // assert assert.Equal(t, tc.expected, actual) @@ -1077,18 +1077,19 @@ func TestFinalizer_checkRemainingResources(t *testing.T) { ZKCounters: state.ZKCounters{GasUsed: 9000}, Bytes: 10000, } - f.wipBatch.remainingResources = remainingResources + f.wipBatch.imRemainingResources = remainingResources testCases := []struct { name string remaining state.BatchResources - expectedErr error + overflow bool + overflowResource string expectedWorkerUpdate bool expectedTxTracker *TxTracker }{ { name: "Success", remaining: remainingResources, - expectedErr: nil, + overflow: false, expectedWorkerUpdate: false, expectedTxTracker: &TxTracker{RawTx: []byte("test")}, }, @@ -1097,7 +1098,8 @@ func TestFinalizer_checkRemainingResources(t *testing.T) { remaining: state.BatchResources{ Bytes: 0, }, - expectedErr: state.ErrBatchResourceBytesUnderflow, + overflow: true, + overflowResource: "Bytes", expectedWorkerUpdate: true, expectedTxTracker: &TxTracker{RawTx: []byte("test")}, }, @@ -1106,7 +1108,8 @@ func TestFinalizer_checkRemainingResources(t *testing.T) { remaining: state.BatchResources{ ZKCounters: state.ZKCounters{GasUsed: 0}, }, - expectedErr: state.NewBatchRemainingResourcesUnderflowError(cumulativeGasErr, cumulativeGasErr.Error()), + overflow: true, + overflowResource: "CumulativeGas", expectedWorkerUpdate: true, expectedTxTracker: &TxTracker{RawTx: make([]byte, 0)}, }, @@ -1115,22 +1118,18 @@ func TestFinalizer_checkRemainingResources(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // arrange - f.wipBatch.remainingResources = tc.remaining + f.wipBatch.imRemainingResources = tc.remaining stateMock.On("AddEvent", ctx, mock.Anything, nil).Return(nil) if tc.expectedWorkerUpdate { workerMock.On("UpdateTxZKCounters", txResponse.TxHash, tc.expectedTxTracker.From, result.UsedZkCounters).Return().Once() } // act - err := f.checkRemainingResources(result, uint64(len(tc.expectedTxTracker.RawTx))) + overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: result.UsedZkCounters, Bytes: uint64(len(tc.expectedTxTracker.RawTx))}) // assert - if tc.expectedErr != nil { - assert.Error(t, err) - assert.EqualError(t, err, tc.expectedErr.Error()) - } else { - assert.NoError(t, err) - } + assert.Equal(t, tc.overflow, overflow) + assert.Equal(t, tc.overflowResource, overflowResource) }) } } @@ -2036,17 +2035,17 @@ func TestFinalizer_isBatchAlmostFull(t *testing.T) { // arrange f = setupFinalizer(true) maxRemainingResource := getMaxRemainingResources(bc) - f.wipBatch.remainingResources = tc.modifyResourceFunc(maxRemainingResource) + f.wipBatch.imRemainingResources = tc.modifyResourceFunc(maxRemainingResource) // act - result := f.isBatchResourcesExhausted() + result, closeReason := f.checkIfFinalizeBatch() // assert assert.Equal(t, tc.expectedResult, result) if tc.expectedResult { - assert.Equal(t, state.BatchAlmostFullClosingReason, f.wipBatch.closingReason) + assert.Equal(t, state.ResourceMarginExhaustedClosingReason, closeReason) } else { - assert.Equal(t, state.EmptyClosingReason, f.wipBatch.closingReason) + assert.Equal(t, state.EmptyClosingReason, closeReason) } }) } @@ -2139,10 +2138,7 @@ func Test_isBatchFull(t *testing.T) { f.wipBatch.countOfTxs = tc.batchCountOfTxs f.batchConstraints.MaxTxsPerBatch = tc.maxTxsPerBatch - assert.Equal(t, tc.expected, f.maxTxsPerBatchReached()) - if tc.expected == true { - assert.Equal(t, state.BatchFullClosingReason, f.wipBatch.closingReason) - } + assert.Equal(t, tc.expected, f.maxTxsPerBatchReached(f.wipBatch)) }) } } @@ -2192,13 +2188,13 @@ func setupFinalizer(withWipBatch bool) *finalizer { panic(err) } wipBatch = &Batch{ - batchNumber: 1, - coinbase: seqAddr, - initialStateRoot: oldHash, - imStateRoot: newHash, - timestamp: now(), - remainingResources: getMaxRemainingResources(bc), - closingReason: state.EmptyClosingReason, + batchNumber: 1, + coinbase: seqAddr, + initialStateRoot: oldHash, + imStateRoot: newHash, + timestamp: now(), + imRemainingResources: getMaxRemainingResources(bc), + closingReason: state.EmptyClosingReason, } } eventStorage, err := nileventstorage.NewNilEventStorage() diff --git a/sequencer/l2block.go b/sequencer/l2block.go index fbd6132f0b..37f9d89777 100644 --- a/sequencer/l2block.go +++ b/sequencer/l2block.go @@ -8,6 +8,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/hex" "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/pool" + "github.com/0xPolygonHermez/zkevm-node/sequencer/metrics" "github.com/0xPolygonHermez/zkevm-node/state" stateMetrics "github.com/0xPolygonHermez/zkevm-node/state/metrics" "github.com/ethereum/go-ethereum/common" @@ -15,11 +16,13 @@ import ( // L2Block represents a wip or processed L2 block type L2Block struct { + trackingNum uint64 timestamp time.Time deltaTimestamp uint32 initialStateRoot common.Hash l1InfoTreeExitRoot state.L1InfoTreeExitRootStorageEntry l1InfoTreeExitRootChanged bool + usedResources state.BatchResources transactions []*TxTracker batchResponse *state.ProcessBatchResponse } @@ -43,27 +46,6 @@ func (b *L2Block) getL1InfoTreeIndex() uint32 { } } -// getUsedResources returns the estimated batch resources used to process the changeL2block tx for this block -func (b *L2Block) getUsedResources() state.BatchResources { - // If the L1InfoTreeIndex has changed we return the resources used when the index != 0, - // otherwise we return the used resources when the index = 0 - if b.l1InfoTreeExitRootChanged { - return l2BlockUsedResourcesIndexNonZero - } else { - return l2BlockUsedResourcesIndexZero - } -} - -// getWriteInfoRootUsedResources returns the additional batch resources used when processing -// this block with the SkipWriteBlockInfoRoot_V2 flag of the executor request to false -// func (b *L2Block) getWriteInfoRootUsedResources() state.BatchResources { -// if b.l1InfoTreeExitRootChanged { -// return l2BlockUsedResourcesIndexNonZero -// } else { -// return l2BlockUsedResourcesIndexZero -// } -// } - // initWIPL2Block inits the wip L2 block func (f *finalizer) initWIPL2Block(ctx context.Context) { // Wait to l1InfoTree to be updated for first time @@ -125,52 +107,14 @@ func (f *finalizer) processPendingL2Blocks(ctx context.Context) { return } - l2Block.initialStateRoot = f.wipBatch.finalStateRoot - - log.Infof("processing L2 block, batch: %d, initialStateRoot: %s txs: %d, l1InfoTreeIndex: %d", - f.wipBatch.batchNumber, l2Block.initialStateRoot, len(l2Block.transactions), l2Block.l1InfoTreeExitRoot.L1InfoTreeIndex) - - startProcessing := time.Now() - batchResponse, err := f.processL2Block(ctx, l2Block) - endProcessing := time.Now() + err := f.processL2Block(ctx, l2Block) if err != nil { - f.Halt(ctx, fmt.Errorf("error processing L2 block, error: %v", err)) - } - - if len(batchResponse.BlockResponses) == 0 { - f.Halt(ctx, fmt.Errorf("error processing L2 block, error: BlockResponses returned by the executor is empty")) - } - - blockResponse := batchResponse.BlockResponses[0] - - // Sanity check. Check blockResponse.TransactionsReponses match l2Block.Transactions length, order and tx hashes - if len(blockResponse.TransactionResponses) != len(l2Block.transactions) { - f.Halt(ctx, fmt.Errorf("error processing L2 block, error: length of TransactionsResponses %d don't match length of l2Block.transactions %d", - len(blockResponse.TransactionResponses), len(l2Block.transactions))) - } - for i, txResponse := range blockResponse.TransactionResponses { - if txResponse.TxHash != l2Block.transactions[i].Hash { - f.Halt(ctx, fmt.Errorf("error processing L2 block, error: TransactionsResponses hash %s in position %d don't match l2Block.transactions[%d] hash %s", - txResponse.TxHash.String(), i, i, l2Block.transactions[i].Hash)) - } + f.Halt(ctx, fmt.Errorf("error processing L2 block [%d], error: %v", l2Block.trackingNum, err)) } - l2Block.batchResponse = batchResponse - - // Update finalStateRoot of the batch to the newStateRoot for the L2 block - f.wipBatch.finalStateRoot = l2Block.batchResponse.NewStateRoot - - log.Infof("processed L2 block: %d, batch: %d, initialStateRoot: %s, stateRoot: %s, txs: %d/%d, blockHash: %s, infoRoot: %s, time: %v, %s", - blockResponse.BlockNumber, f.wipBatch.batchNumber, l2Block.initialStateRoot, l2Block.batchResponse.NewStateRoot, len(l2Block.transactions), - len(blockResponse.TransactionResponses), blockResponse.BlockHash, blockResponse.BlockInfoRoot.String(), endProcessing.Sub(startProcessing), - f.logZKCounters(batchResponse.UsedZkCounters)) - - f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID) - - f.addPendingL2BlockToStore(ctx, l2Block) - f.pendingL2BlocksToProcessWG.Done() + case <-ctx.Done(): // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit f.pendingL2BlocksToProcessWG.Wait() @@ -191,39 +135,10 @@ func (f *finalizer) storePendingL2Blocks(ctx context.Context) { return } - // Wait until L2 block has been flushed/stored by the executor - f.storedFlushIDCond.L.Lock() - for f.storedFlushID < l2Block.batchResponse.FlushID { - f.storedFlushIDCond.Wait() - // check if context is done after waking up - if ctx.Err() != nil { - f.storedFlushIDCond.L.Unlock() - return - } - } - f.storedFlushIDCond.L.Unlock() - - // If the L2 block has txs now f.storedFlushID >= l2BlockToStore.flushId, we can store tx - blockResponse := l2Block.batchResponse.BlockResponses[0] - log.Infof("storing L2 block: %d, batch: %d, txs: %d/%d, blockHash: %s, infoRoot: %s", - blockResponse.BlockNumber, f.wipBatch.batchNumber, len(l2Block.transactions), len(blockResponse.TransactionResponses), - blockResponse.BlockHash, blockResponse.BlockInfoRoot.String()) - - startStoring := time.Now() err := f.storeL2Block(ctx, l2Block) - endStoring := time.Now() if err != nil { - f.Halt(ctx, fmt.Errorf("error storing L2 block %d, error: %v", l2Block.batchResponse.BlockResponses[0].BlockNumber, err)) - } - - log.Infof("stored L2 block: %d, batch: %d, txs: %d/%d, blockHash: %s, infoRoot: %s, time: %v", - blockResponse.BlockNumber, f.wipBatch.batchNumber, len(l2Block.transactions), len(blockResponse.TransactionResponses), - blockResponse.BlockHash, blockResponse.BlockInfoRoot.String(), endStoring.Sub(startStoring)) - - for _, tx := range l2Block.transactions { - // Delete the tx from the pending list in the worker (addrQueue) - f.workerIntf.DeletePendingTxToStore(tx.Hash, tx.From) + f.Halt(ctx, fmt.Errorf("error storing L2 block %d [%d], error: %v", l2Block.batchResponse.BlockResponses[0].BlockNumber, l2Block.trackingNum, err)) } f.pendingL2BlocksToStoreWG.Done() @@ -237,13 +152,70 @@ func (f *finalizer) storePendingL2Blocks(ctx context.Context) { } } -// processL2Block process (executor) a L2 Block and adds it to the pendingL2BlocksToStore channel. It returns the response block from the executor -func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) (*state.ProcessBatchResponse, error) { - processL2BLockError := func(err error) { - log.Errorf("process L2 block error %v, batch: %d, initialStateRoot: %s", err, f.wipBatch.batchNumber, l2Block.initialStateRoot.String()) +// processL2Block process a L2 Block and adds it to the pendingL2BlocksToStore channel +func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error { + startProcessing := time.Now() + + l2Block.initialStateRoot = f.wipBatch.finalStateRoot + + log.Infof("processing L2 block [%d], batch: %d, initialStateRoot: %s txs: %d, l1InfoTreeIndex: %d", + l2Block.trackingNum, f.wipBatch.batchNumber, l2Block.initialStateRoot, len(l2Block.transactions), l2Block.l1InfoTreeExitRoot.L1InfoTreeIndex) + + batchResponse, batchL2DataSize, err := f.executeL2Block(ctx, l2Block) + + if err != nil { + return fmt.Errorf("failed to execute L2 block [%d], error: %v", l2Block.trackingNum, err) + } + + if len(batchResponse.BlockResponses) != 1 { + return fmt.Errorf("length of batchResponse.BlockRespones returned by the executor is %d and must be 1", len(batchResponse.BlockResponses)) + } + + blockResponse := batchResponse.BlockResponses[0] + + // Sanity check. Check blockResponse.TransactionsReponses match l2Block.Transactions length, order and tx hashes + if len(blockResponse.TransactionResponses) != len(l2Block.transactions) { + return fmt.Errorf("length of TransactionsResponses %d don't match length of l2Block.transactions %d", len(blockResponse.TransactionResponses), len(l2Block.transactions)) + } + for i, txResponse := range blockResponse.TransactionResponses { + if txResponse.TxHash != l2Block.transactions[i].Hash { + return fmt.Errorf("blockResponse.TransactionsResponses[%d] hash %s don't match l2Block.transactions[%d] hash %s", i, txResponse.TxHash.String(), i, l2Block.transactions[i].Hash) + } + } + + l2Block.batchResponse = batchResponse + + // Update finalRemainingResources of the batch + overflow, overflowResource := f.wipBatch.finalRemainingResources.Sub(state.BatchResources{ZKCounters: batchResponse.UsedZkCounters, Bytes: batchL2DataSize}) + if overflow { + return fmt.Errorf("error sustracting L2 block %d [%d] resources from the batch %d, overflow resource: %s, batch remaining counters: %s, L2Block used counters: %s, batch remaining bytes: %d, L2Block used bytes: %d", + blockResponse.BlockNumber, l2Block.trackingNum, f.wipBatch.batchNumber, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.wipBatch.imRemainingResources.Bytes, batchL2DataSize) + } + + // Update finalStateRoot of the batch to the newStateRoot for the L2 block + f.wipBatch.finalStateRoot = l2Block.batchResponse.NewStateRoot + + f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID) + + f.addPendingL2BlockToStore(ctx, l2Block) + + endProcessing := time.Now() + + log.Infof("processed L2 block %d [%d], batch: %d, initialStateRoot: %s, stateRoot: %s, txs: %d/%d, blockHash: %s, infoRoot: %s, time: %v, used counters: %s", + blockResponse.BlockNumber, l2Block.trackingNum, f.wipBatch.batchNumber, l2Block.initialStateRoot, l2Block.batchResponse.NewStateRoot, len(l2Block.transactions), + len(blockResponse.TransactionResponses), blockResponse.BlockHash, blockResponse.BlockInfoRoot, endProcessing.Sub(startProcessing), + f.logZKCounters(batchResponse.UsedZkCounters)) + + return nil +} + +// executeL2Block executes a L2 Block in the executor and returns the batch response from the executor and the batchL2Data size +func (f *finalizer) executeL2Block(ctx context.Context, l2Block *L2Block) (*state.ProcessBatchResponse, uint64, error) { + executeL2BLockError := func(err error) { + log.Errorf("execute L2 block [%d] error %v, batch: %d, initialStateRoot: %s", l2Block.trackingNum, err, f.wipBatch.batchNumber, l2Block.initialStateRoot.String()) // Log batch detailed info for i, tx := range l2Block.transactions { - log.Infof("batch: %d, tx position %d, tx hash: %s", f.wipBatch.batchNumber, i, tx.HashStr) + log.Infof("batch: %d, block: [%d], tx position: %d, tx hash: %s", f.wipBatch.batchNumber, l2Block.trackingNum, i, tx.HashStr) } } @@ -258,7 +230,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) (*stat epHex, err := hex.DecodeHex(fmt.Sprintf("%x", tx.EGPPercentage)) if err != nil { log.Errorf("error decoding hex value for effective gas price percentage for tx %s, error: %v", tx.HashStr, err) - return nil, err + return nil, 0, err } txData := append(tx.RawTx, epHex...) @@ -294,29 +266,43 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) (*stat batchResponse, err = f.stateIntf.ProcessBatchV2(ctx, batchRequest, true) if err != nil { - processL2BLockError(err) - return nil, err + executeL2BLockError(err) + return nil, 0, err } if batchResponse.ExecutorError != nil { - processL2BLockError(err) - return nil, ErrExecutorError + executeL2BLockError(err) + return nil, 0, ErrExecutorError } if batchResponse.IsRomOOCError { - processL2BLockError(err) - return nil, ErrProcessBatchOOC + executeL2BLockError(err) + return nil, 0, ErrProcessBatchOOC } - return batchResponse, nil + return batchResponse, uint64(len(batchL2Data)), nil } // storeL2Block stores the L2 block in the state and updates the related batch and transactions func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { - //log.Infof("storeL2Block: storing processed txToStore: %s", txToStore.response.TxHash.String()) + startStoring := time.Now() + + // Wait until L2 block has been flushed/stored by the executor + f.storedFlushIDCond.L.Lock() + for f.storedFlushID < l2Block.batchResponse.FlushID { + f.storedFlushIDCond.Wait() + } + f.storedFlushIDCond.L.Unlock() + + // If the L2 block has txs now f.storedFlushID >= l2BlockToStore.flushId, we can store tx + blockResponse := l2Block.batchResponse.BlockResponses[0] + log.Infof("storing L2 block %d [%d], batch: %d, txs: %d/%d, blockHash: %s, infoRoot: %s", + blockResponse.BlockNumber, l2Block.trackingNum, f.wipBatch.batchNumber, len(l2Block.transactions), len(blockResponse.TransactionResponses), + blockResponse.BlockHash, blockResponse.BlockInfoRoot.String()) + dbTx, err := f.stateIntf.BeginStateTransaction(ctx) if err != nil { - return fmt.Errorf("error creating db transaction to store L2 block, error: %v", err) + return fmt.Errorf("error creating db transaction to store L2 block %d [%d], error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err) } rollbackOnError := func(retError error) error { @@ -327,7 +313,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { return retError } - blockResponse := l2Block.batchResponse.BlockResponses[0] forkID := f.stateIntf.GetForkIDByBatchNumber(f.wipBatch.batchNumber) txsEGPLog := []*state.EffectiveGasPriceLog{} @@ -339,7 +324,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { // Store L2 block in the state err = f.stateIntf.StoreL2Block(ctx, f.wipBatch.batchNumber, blockResponse, txsEGPLog, dbTx) if err != nil { - return rollbackOnError(fmt.Errorf("database error on storing L2 block %d, error: %v", blockResponse.BlockNumber, err)) + return rollbackOnError(fmt.Errorf("database error on storing L2 block %d [%d], error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)) } // Now we need to update de BatchL2Data of the wip batch and also update the status of the L2 block txs in the pool @@ -405,15 +390,26 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse) if err != nil { //TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer? - log.Errorf("error sending L2 block %d to data streamer, error: %v", blockResponse.BlockNumber, err) + log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err) + } + + for _, tx := range l2Block.transactions { + // Delete the tx from the pending list in the worker (addrQueue) + f.workerIntf.DeletePendingTxToStore(tx.Hash, tx.From) } + endStoring := time.Now() + + log.Infof("stored L2 block: %d [%d], batch: %d, txs: %d/%d, blockHash: %s, infoRoot: %s, time: %v", + blockResponse.BlockNumber, l2Block.trackingNum, f.wipBatch.batchNumber, len(l2Block.transactions), len(blockResponse.TransactionResponses), + blockResponse.BlockHash, blockResponse.BlockInfoRoot.String(), endStoring.Sub(startStoring)) + return nil } // finalizeWIPL2Block closes the wip L2 block and opens a new one func (f *finalizer) finalizeWIPL2Block(ctx context.Context) { - log.Debugf("finalizing WIP L2 block") + log.Debugf("finalizing WIP L2 block [%d]", f.wipL2Block.trackingNum) prevTimestamp := f.wipL2Block.timestamp prevL1InfoTreeIndex := f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex @@ -423,27 +419,36 @@ func (f *finalizer) finalizeWIPL2Block(ctx context.Context) { f.openNewWIPL2Block(ctx, prevTimestamp, &prevL1InfoTreeIndex) } +// closeWIPL2Block closes the wip L2 block func (f *finalizer) closeWIPL2Block(ctx context.Context) { - log.Debugf("closing WIP L2 block") - - // If the L2 block is empty (no txs) We need to process it to update the state root and remaining batch resources before closing it - if f.wipL2Block.isEmpty() { - log.Debug("processing WIP L2 block because it is empty") - if err := f.processEmptyL2Block(ctx); err != nil { - f.Halt(ctx, fmt.Errorf("failed to process empty WIP L2 block, error: %v ", err)) - } - } + log.Debugf("closing WIP L2 block [%d]", f.wipL2Block.trackingNum) f.wipBatch.countOfL2Blocks++ - f.addPendingL2BlockToProcess(ctx, f.wipL2Block) + if f.cfg.SequentialProcessL2Block { + err := f.processL2Block(ctx, f.wipL2Block) + if err != nil { + f.Halt(ctx, fmt.Errorf("error processing L2 block [%d], error: %v", f.wipL2Block.trackingNum, err)) + } + // We update imStateRoot (used in tx-by-tx execution) to the finalStateRoot that has been updated after process the WIP L2 Block + f.wipBatch.imStateRoot = f.wipBatch.finalStateRoot + } else { + f.addPendingL2BlockToProcess(ctx, f.wipL2Block) + } f.wipL2Block = nil } +// openNewWIPL2Block opens a new wip L2 block func (f *finalizer) openNewWIPL2Block(ctx context.Context, prevTimestamp time.Time, prevL1InfoTreeIndex *uint32) { newL2Block := &L2Block{} + // Tracking number + newL2Block.trackingNum = f.l2BlockCounter + f.l2BlockCounter++ + + log.Debugf("opening new WIP L2 block [%d]", newL2Block.trackingNum) + newL2Block.timestamp = now() newL2Block.deltaTimestamp = uint32(newL2Block.timestamp.Sub(prevTimestamp).Truncate(time.Second).Seconds()) @@ -470,16 +475,81 @@ func (f *finalizer) openNewWIPL2Block(ctx context.Context, prevTimestamp time.Ti f.wipL2Block = newL2Block - // Check if there are enough counters in the wip batch to store the new wip L2 block - err := f.wipBatch.remainingResources.Sub(f.wipL2Block.getUsedResources()) - // We close the wip batch and open a new one if we got an error when subtracting the getUsedResources or we have exhausted some resources of the batch - if err != nil || f.isBatchResourcesExhausted() { - err := f.closeAndOpenNewWIPBatch(ctx) + // We process (execute) the new wip L2 block to update the imStateRoot and also get the counters used by the wip l2block + batchResponse, err := f.executeNewWIPL2Block(ctx) + if err != nil { + f.Halt(ctx, fmt.Errorf("failed to execute new WIP L2 block [%d], error: %v ", f.wipL2Block.trackingNum, err)) + } + + if len(batchResponse.BlockResponses) != 1 { + f.Halt(ctx, fmt.Errorf("number of L2 block [%d] responses returned by the executor is %d and must be 1", f.wipL2Block.trackingNum, len(batchResponse.BlockResponses))) + } + + // Update imStateRoot and wip L2 block number + f.wipBatch.imStateRoot = batchResponse.NewStateRoot + + // Save and sustract the resources used by the new WIP L2 block from the wip batch + // We need to increase the poseidon hashes to reserve in the batch the hashes needed to write the L1InfoRoot when processing the final L2 Block (SkipWriteBlockInfoRoot_V2=false) + f.wipL2Block.usedResources.ZKCounters = batchResponse.UsedZkCounters + f.wipL2Block.usedResources.ZKCounters.UsedPoseidonHashes = (batchResponse.UsedZkCounters.UsedPoseidonHashes * 2) + 2 // nolint:gomnd + f.wipL2Block.usedResources.Bytes = changeL2BlockSize + + overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(f.wipL2Block.usedResources) + if overflow { + log.Infof("new WIP L2 block [%d] exceeds the remaining resources from the batch %d, overflow resource: %s, closing WIP batch and creating new one", + f.wipL2Block.trackingNum, f.wipBatch.batchNumber, overflowResource) + err := f.closeAndOpenNewWIPBatch(ctx, state.ResourceExhaustedClosingReason) if err != nil { - f.Halt(ctx, fmt.Errorf("failed to create new WIP batch, error: %v", err)) + f.Halt(ctx, fmt.Errorf("failed to create new WIP batch [%d], error: %v", f.wipL2Block.trackingNum, err)) } } - log.Debugf("new WIP L2 block created: batch: %d, initialStateRoot: %s, timestamp: %d, l1InfoTreeIndex: %d", - f.wipBatch.batchNumber, f.wipL2Block.initialStateRoot, f.wipL2Block.timestamp.Unix(), f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex) + log.Infof("new WIP L2 block [%d] created, batch: %d, timestamp: %d, l1InfoTreeIndex: %d, l1InfTreeIndexChanged: %s, oldStateRoot: %s, stateRoot: %s, used counters: %s", + f.wipL2Block.trackingNum, f.wipBatch.batchNumber, f.wipL2Block.timestamp.Unix(), f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex, + f.wipL2Block.l1InfoTreeExitRootChanged, f.wipBatch.imStateRoot, batchResponse.NewStateRoot, f.logZKCounters(f.wipL2Block.usedResources.ZKCounters)) +} + +// executeNewWIPL2Block executes an empty L2 Block in the executor and returns the batch response from the executor +func (f *finalizer) executeNewWIPL2Block(ctx context.Context) (*state.ProcessBatchResponse, error) { + start := time.Now() + defer func() { + metrics.ProcessingTime(time.Since(start)) + }() + + batchRequest := state.ProcessRequest{ + BatchNumber: f.wipBatch.batchNumber, + OldStateRoot: f.wipBatch.imStateRoot, + Coinbase: f.wipBatch.coinbase, + L1InfoRoot_V2: mockL1InfoRoot, + TimestampLimit_V2: uint64(f.wipL2Block.timestamp.Unix()), + Caller: stateMetrics.SequencerCallerLabel, + ForkID: f.stateIntf.GetForkIDByBatchNumber(f.wipBatch.batchNumber), + SkipWriteBlockInfoRoot_V2: true, + SkipVerifyL1InfoRoot_V2: true, + SkipFirstChangeL2Block_V2: false, + Transactions: f.stateIntf.BuildChangeL2Block(f.wipL2Block.deltaTimestamp, f.wipL2Block.getL1InfoTreeIndex()), + L1InfoTreeData_V2: map[uint32]state.L1DataV2{}, + } + + batchRequest.L1InfoTreeData_V2[f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex] = state.L1DataV2{ + GlobalExitRoot: f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.GlobalExitRoot, + BlockHashL1: f.wipL2Block.l1InfoTreeExitRoot.PreviousBlockHash, + MinTimestamp: uint64(f.wipL2Block.l1InfoTreeExitRoot.GlobalExitRoot.Timestamp.Unix()), + } + + batchResponse, err := f.stateIntf.ProcessBatchV2(ctx, batchRequest, false) + + if err != nil { + return nil, err + } + + if batchResponse.ExecutorError != nil { + return nil, ErrExecutorError + } + + if batchResponse.IsRomOOCError { + return nil, ErrProcessBatchOOC + } + + return batchResponse, nil } diff --git a/sequencer/worker.go b/sequencer/worker.go index 0c80e76003..64feb05b64 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -318,8 +318,8 @@ func (w *Worker) GetBestFittingTx(resources state.BatchResources) (*TxTracker, e foundMutex.RUnlock() txCandidate := w.txSortedList.getByIndex(i) - err := bresources.Sub(txCandidate.BatchResources) - if err != nil { + overflow, _ := bresources.Sub(txCandidate.BatchResources) + if overflow { // We don't add this Tx continue } diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 9d134e5420..d68556ef71 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -271,8 +271,8 @@ func TestWorkerGetBestTx(t *testing.T) { if tx.HashStr != expectedGetBestTx[ct].String() { t.Fatalf("Error GetBestFittingTx(%d). Expected=%s, Actual=%s", ct, expectedGetBestTx[ct].String(), tx.HashStr) } - err := rc.Sub(tx.BatchResources) - assert.NoError(t, err) + overflow, _ := rc.Sub(tx.BatchResources) + assert.Equal(t, false, overflow) touch := make(map[common.Address]*state.InfoReadWrite) var newNonce uint64 = tx.Nonce + 1 diff --git a/sequencesender/sequencesender.go b/sequencesender/sequencesender.go index b95a84b5de..aa92886f80 100644 --- a/sequencesender/sequencesender.go +++ b/sequencesender/sequencesender.go @@ -116,7 +116,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context, ticker *time.Tic // Check there are L2 blocks for the last batch if len(lastBatchL2Blocks) == 0 { - log.Errorf("no L2 blocks returned from the state for batch %d") + log.Errorf("no L2 blocks returned from the state for batch %d", lastBatchNumInSequence) return } diff --git a/state/batch.go b/state/batch.go index 6afd2000ec..174492c957 100644 --- a/state/batch.go +++ b/state/batch.go @@ -59,18 +59,20 @@ type ClosingReason string const ( // EmptyClosingReason is the closing reason used when a batch is not closed EmptyClosingReason ClosingReason = "" - // BatchFullClosingReason is the closing reason used when a batch is closed when it is full - BatchFullClosingReason ClosingReason = "Batch is full" - // ForcedBatchClosingReason is the closing reason used when a batch is closed because it is forced - ForcedBatchClosingReason ClosingReason = "Forced Batch" - // BatchAlmostFullClosingReason is the closing reason used when the batch it is almost full - BatchAlmostFullClosingReason ClosingReason = "Batch is almost full" + // MaxTxsClosingReason is the closing reason used when a batch reachs the max transactions per batch + MaxTxsClosingReason ClosingReason = "Max transactions" + // ResourceExhaustedClosingReason is the closing reason used when a batch has a resource (zkCounter or Bytes) exhausted + ResourceExhaustedClosingReason ClosingReason = "Resource exhausted" + // ResourceMarginExhaustedClosingReason is the closing reason used when a batch has a resource (zkCounter or Bytes) margin exhausted + ResourceMarginExhaustedClosingReason ClosingReason = "Resource margin exhausted" + // ForcedBatchClosingReason is the closing reason used when a batch is a forced batch + ForcedBatchClosingReason ClosingReason = "Forced batch" // ForcedBatchDeadlineClosingReason is the closing reason used when forced batch deadline is reached - ForcedBatchDeadlineClosingReason ClosingReason = "Forced Batch deadline" - // TimeoutResolutionDeadlineClosingReason is the closing reason used when timeout resolution deadline is reached - TimeoutResolutionDeadlineClosingReason ClosingReason = "timeout resolution deadline" - // GlobalExitRootDeadlineClosingReason is the closing reason used when Global Exit Root deadline is reached - GlobalExitRootDeadlineClosingReason ClosingReason = "Global Exit Root deadline" + ForcedBatchDeadlineClosingReason ClosingReason = "Forced batch deadline" + // MaxDeltaTimestampClosingReason is the closing reason used when max delta batch timestamp is reached + MaxDeltaTimestampClosingReason ClosingReason = "Max delta timestamp delta" + // NoTxFitsClosingReason is the closing reason used when any of the txs in the pool (worker) fits in the remaining resources of the batch + NoTxFitsClosingReason ClosingReason = "No transactions fits" ) // ProcessingReceipt indicates the outcome (StateRoot, AccInputHash) of processing a batch diff --git a/state/errors.go b/state/errors.go index 19887a3599..18aee6b9f4 100644 --- a/state/errors.go +++ b/state/errors.go @@ -55,8 +55,6 @@ var ( ErrUnsupportedDuration = errors.New("unsupported time duration") // ErrInvalidData is the error when the raw txs is unexpected ErrInvalidData = errors.New("invalid data") - // ErrBatchResourceBytesUnderflow happens when the batch runs out of Bytes - ErrBatchResourceBytesUnderflow = NewBatchRemainingResourcesUnderflowError(nil, "Bytes") // ErrInvalidBlockRange returned when the selected block range is invalid, generally // because the toBlock is bigger than the fromBlock ErrInvalidBlockRange = errors.New("invalid block range") @@ -69,8 +67,6 @@ var ( // ErrMaxNativeBlockHashBlockRangeLimitExceeded returned when the range between block number range // to filter native block hashes is bigger than the configured limit ErrMaxNativeBlockHashBlockRangeLimitExceeded = errors.New("native block hashes are limited to a %v block range") - - zkCounterErrPrefix = "ZKCounter: " ) func constructErrorFromRevert(err error, returnValue []byte) error { @@ -82,11 +78,6 @@ func constructErrorFromRevert(err error, returnValue []byte) error { return fmt.Errorf("%w: %s", err, revertErrMsg) } -// GetZKCounterError returns the error associated with the zkCounter -func GetZKCounterError(name string) error { - return errors.New(zkCounterErrPrefix + name) -} - // BatchRemainingResourcesUnderflowError happens when the execution of a batch runs out of counters type BatchRemainingResourcesUnderflowError struct { Message string @@ -100,16 +91,6 @@ func (b BatchRemainingResourcesUnderflowError) Error() string { return constructErrorMsg(b.ResourceName) } -// NewBatchRemainingResourcesUnderflowError creates a new BatchRemainingResourcesUnderflowError -func NewBatchRemainingResourcesUnderflowError(err error, resourceName string) error { - return &BatchRemainingResourcesUnderflowError{ - Message: constructErrorMsg(resourceName), - Code: 1, - Err: err, - ResourceName: resourceName, - } -} - func constructErrorMsg(resourceName string) string { return fmt.Sprintf("underflow of remaining resources for current batch. Resource %s", resourceName) } diff --git a/state/pgstatestorage/pgstatestorage_test.go b/state/pgstatestorage/pgstatestorage_test.go index fff1407f9d..88882d3681 100644 --- a/state/pgstatestorage/pgstatestorage_test.go +++ b/state/pgstatestorage/pgstatestorage_test.go @@ -60,7 +60,6 @@ var ( }, Bytes: 1, } - closingReason = state.GlobalExitRootDeadlineClosingReason ) func initOrResetDB() { diff --git a/state/types.go b/state/types.go index 9e99298822..510e1b1a37 100644 --- a/state/types.go +++ b/state/types.go @@ -2,7 +2,6 @@ package state import ( "encoding/json" - "fmt" "math/big" "strings" "time" @@ -177,35 +176,35 @@ func (z *ZKCounters) SumUp(other ZKCounters) { z.UsedSha256Hashes_V2 += other.UsedSha256Hashes_V2 } -// Sub subtract zk counters with passed zk counters (not safe) -func (z *ZKCounters) Sub(other ZKCounters) error { +// Sub subtract zk counters with passed zk counters (not safe). if there is a counter underflow it returns true and the name of the counter that caused the overflow +func (z *ZKCounters) Sub(other ZKCounters) (bool, string) { // ZKCounters if other.GasUsed > z.GasUsed { - return GetZKCounterError("CumulativeGasUsed") + return true, "CumulativeGas" } if other.UsedKeccakHashes > z.UsedKeccakHashes { - return GetZKCounterError("UsedKeccakHashes") + return true, "KeccakHashes" } if other.UsedPoseidonHashes > z.UsedPoseidonHashes { - return GetZKCounterError("UsedPoseidonHashes") + return true, "PoseidonHashes" } if other.UsedPoseidonPaddings > z.UsedPoseidonPaddings { - return fmt.Errorf("underflow ZKCounter: UsedPoseidonPaddings") + return true, "PoseidonPaddings" } if other.UsedMemAligns > z.UsedMemAligns { - return GetZKCounterError("UsedMemAligns") + return true, "UsedMemAligns" } if other.UsedArithmetics > z.UsedArithmetics { - return GetZKCounterError("UsedArithmetics") + return true, "UsedArithmetics" } if other.UsedBinaries > z.UsedBinaries { - return GetZKCounterError("UsedBinaries") + return true, "UsedBinaries" } if other.UsedSteps > z.UsedSteps { - return GetZKCounterError("UsedSteps") + return true, "UsedSteps" } if other.UsedSha256Hashes_V2 > z.UsedSha256Hashes_V2 { - return GetZKCounterError("UsedSha256Hashes_V2") + return true, "UsedSha256Hashes_V2" } z.GasUsed -= other.GasUsed @@ -218,7 +217,7 @@ func (z *ZKCounters) Sub(other ZKCounters) error { z.UsedSteps -= other.UsedSteps z.UsedSha256Hashes_V2 -= other.UsedSha256Hashes_V2 - return nil + return false, "" } // BatchResources is a struct that contains the ZKEVM resources used by a batch/tx @@ -227,21 +226,21 @@ type BatchResources struct { Bytes uint64 } -// Sub subtracts the batch resources from other -func (r *BatchResources) Sub(other BatchResources) error { +// Sub subtracts the batch resources from other. if there is a resource underflow it returns true and the name of the resource that caused the overflow +func (r *BatchResources) Sub(other BatchResources) (bool, string) { // Bytes if other.Bytes > r.Bytes { - return ErrBatchResourceBytesUnderflow + return true, "Bytes" } bytesBackup := r.Bytes r.Bytes -= other.Bytes - err := r.ZKCounters.Sub(other.ZKCounters) - if err != nil { + exhausted, resourceName := r.ZKCounters.Sub(other.ZKCounters) + if exhausted { r.Bytes = bytesBackup - return NewBatchRemainingResourcesUnderflowError(err, err.Error()) + return exhausted, resourceName } - return err + return false, "" } // SumUp sum ups the batch resources from other diff --git a/test/config/debug.node.config.toml b/test/config/debug.node.config.toml index 44b39f80d3..aa1a5f5ee0 100644 --- a/test/config/debug.node.config.toml +++ b/test/config/debug.node.config.toml @@ -102,6 +102,7 @@ StateConsistencyCheckInterval = "5s" ResourceExhaustedMarginPct = 10 HaltOnBatchNumber = 0 SequentialBatchSanityCheck = false + SequentialProcessL2Block = true [Sequencer.StreamServer] Port = 6900 Filename = "/datastreamer/datastream.bin" diff --git a/test/config/test.node.config.toml b/test/config/test.node.config.toml index fca6737463..822f21740c 100644 --- a/test/config/test.node.config.toml +++ b/test/config/test.node.config.toml @@ -116,6 +116,7 @@ StateConsistencyCheckInterval = "5s" ResourceExhaustedMarginPct = 10 HaltOnBatchNumber = 0 SequentialBatchSanityCheck = false + SequentialProcessL2Block = true [Sequencer.StreamServer] Port = 6900 Filename = "/datastreamer/datastream.bin"