Skip to content

Commit

Permalink
Fix tx OOC (node level) when first empty L2 block in batch (#3744)
Browse files Browse the repository at this point in the history
* Fix tx OOC (node level) for first empty L2 block in batch

* change log level for ooc (node level) when adding tx to the worker

* fix check OOC (node level) when preexecuting the tx in RPC

* Fix linter and test
  • Loading branch information
agnusmor authored Aug 2, 2024
1 parent d20b6b2 commit adf526c
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 57 deletions.
23 changes: 14 additions & 9 deletions pool/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package pool

import (
"fmt"
"testing"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/stretchr/testify/assert"
)

func TestIsWithinConstraints(t *testing.T) {
Expand All @@ -20,9 +22,9 @@ func TestIsWithinConstraints(t *testing.T) {
}

testCases := []struct {
desc string
counters state.ZKCounters
expected bool
desc string
counters state.ZKCounters
errExpected error
}{
{
desc: "All constraints within limits",
Expand All @@ -37,7 +39,7 @@ func TestIsWithinConstraints(t *testing.T) {
Steps: 2000,
Sha256Hashes_V2: 4000,
},
expected: true,
errExpected: nil,
},
{
desc: "All constraints exceed limits",
Expand All @@ -52,14 +54,17 @@ func TestIsWithinConstraints(t *testing.T) {
Steps: 5000,
Sha256Hashes_V2: 6000,
},
expected: false,
errExpected: fmt.Errorf("out of counters at node level (GasUsed, KeccakHashes, PoseidonHashes, PoseidonPaddings, MemAligns, Arithmetics, Binaries, Steps, Sha256Hashes)"),
},
}

for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
if got := cfg.IsWithinConstraints(tC.counters); got != tC.expected {
t.Errorf("Expected %v, got %v", tC.expected, got)
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := cfg.CheckNodeLevelOOC(tc.counters)
if tc.errExpected != nil {
assert.EqualError(t, err, tc.errExpected.Error())
} else {
assert.NoError(t, err)
}
})
}
Expand Down
17 changes: 10 additions & 7 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,16 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW
return err
}

var oocError error
if preExecutionResponse.OOCError != nil {
oocError = preExecutionResponse.OOCError
} else {
if err = p.batchConstraintsCfg.CheckNodeLevelOOC(preExecutionResponse.reservedZKCounters); err != nil {
oocError = err
}
}

if oocError != nil {
event := &event.Event{
ReceivedAt: time.Now(),
IPAddress: ip,
Expand All @@ -212,7 +221,7 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW
log.Errorf("error adding event: %v", err)
}
// Do not add tx to the pool
return fmt.Errorf("failed to add tx to the pool: %w", preExecutionResponse.OOCError)
return fmt.Errorf("failed to add tx to the pool: %w", oocError)
} else if preExecutionResponse.OOGError != nil {
event := &event.Event{
ReceivedAt: time.Now(),
Expand Down Expand Up @@ -332,12 +341,6 @@ func (p *Pool) preExecuteTx(ctx context.Context, tx types.Transaction) (preExecu
if errors.Is(errorToCheck, runtime.ErrOutOfGas) {
response.OOGError = err
}
} else {
if !p.batchConstraintsCfg.IsWithinConstraints(processBatchResponse.UsedZkCounters) {
err := fmt.Errorf("OutOfCounters Error (Node level) for tx: %s", tx.Hash().String())
response.OOCError = err
log.Error(err.Error())
}
}

response.usedZKCounters = processBatchResponse.UsedZkCounters
Expand Down
27 changes: 22 additions & 5 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,26 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
f.finalizeWIPL2Block(ctx)
}

tx, err := f.workerIntf.GetBestFittingTx(f.wipBatch.imRemainingResources, f.wipBatch.imHighReservedZKCounters)
tx, oocTxs, err := f.workerIntf.GetBestFittingTx(f.wipBatch.imRemainingResources, f.wipBatch.imHighReservedZKCounters, (f.wipBatch.countOfL2Blocks == 0 && f.wipL2Block.isEmpty()))

// If we have txs pending to process but none of them fits into the wip batch, we close the wip batch and open a new one
// Set as invalid txs in the worker pool that will never fit into an empty batch
for _, oocTx := range oocTxs {
log.Infof("tx %s doesn't fits in empty batch %d (node OOC), setting tx as invalid in the pool", oocTx.HashStr, f.wipL2Block.trackingNum, f.wipBatch.batchNumber)

f.LogEvent(ctx, event.Level_Info, event.EventID_NodeOOC,
fmt.Sprintf("tx %s doesn't fits in empty batch %d (node OOC), from: %s, IP: %s", oocTx.HashStr, f.wipBatch.batchNumber, oocTx.FromStr, oocTx.IP), nil)

// Delete the transaction from the worker
f.workerIntf.DeleteTx(oocTx.Hash, oocTx.From)

errMsg := "node OOC"
err = f.poolIntf.UpdateTxStatus(ctx, oocTx.Hash, pool.TxStatusInvalid, false, &errMsg)
if err != nil {
log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", oocTx.Hash.String(), err)
}
}

// We have txs pending to process but none of them fits into the wip batch we close the wip batch and open a new one
if err == ErrNoFittingTransaction {
f.finalizeWIPBatch(ctx, state.NoTxFitsClosingReason)
continue
Expand Down Expand Up @@ -690,11 +707,11 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
} else {
log.Infof("current tx %s needed resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.UsedZkCounters), f.logZKCounters(result.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(f.wipBatch.imHighReservedZKCounters))
if !f.batchConstraints.IsWithinConstraints(result.ReservedZkCounters) {
log.Infof("current tx %s reserved resources exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool", tx.HashStr)
if err := f.batchConstraints.CheckNodeLevelOOC(result.ReservedZkCounters); err != nil {
log.Infof("current tx %s reserved resources exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool, error: %v", tx.HashStr, err)

f.LogEvent(ctx, event.Level_Info, event.EventID_NodeOOC,
fmt.Sprintf("tx %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP), nil)
fmt.Sprintf("tx %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s, error: %v", tx.HashStr, tx.FromStr, tx.IP, err), nil)

// Delete the transaction from the txSorted list
f.workerIntf.DeleteTx(tx.Hash, tx.From)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type stateInterface interface {
}

type workerInterface interface {
GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters) (*TxTracker, error)
GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters, fistL2Block bool) (*TxTracker, []*TxTracker, error)
UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.InfoReadWrite) []*TxTracker
UpdateTxZKCounters(txHash common.Hash, from common.Address, usedZKCounters state.ZKCounters, reservedZKCounters state.ZKCounters)
AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (f *finalizer) openNewWIPL2Block(ctx context.Context, prevTimestamp uint64,

f.wipBatch.imHighReservedZKCounters = newHighZKCounters
} else {
log.Infof("new wip L2 block [%d] reserved resources exceeds the remaining batch resources, overflow resource: %s, closing WIP batch and creating new one. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
log.Infof("new wip L2 block [%d] needed resources exceeds the remaining batch resources, overflow resource: %s, closing WIP batch and creating new one. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
f.wipL2Block.trackingNum, overflowResource,
f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(f.wipL2Block.usedZKCountersOnNew), f.logZKCounters(f.wipL2Block.reservedZKCountersOnNew), f.logZKCounters(neededZKCounters), f.logZKCounters(f.wipBatch.imHighReservedZKCounters))
}
Expand Down
33 changes: 21 additions & 12 deletions sequencer/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 25 additions & 10 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (w *Worker) addTxTracker(ctx context.Context, tx *TxTracker, mutex *sync.Mu
}

// Make sure the transaction's reserved ZKCounters are within the constraints.
if !w.batchConstraints.IsWithinConstraints(tx.ReservedZKCounters) {
log.Errorf("outOfCounters error (node level) for tx %s", tx.Hash.String())
if err := w.batchConstraints.CheckNodeLevelOOC(tx.ReservedZKCounters); err != nil {
log.Infof("out of counters (node level) when adding tx %s from address %s, error: %v", tx.Hash, tx.From, err)
mutexUnlock(mutex)
return nil, pool.ErrOutOfCounters
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (w *Worker) DeleteTxPendingToStore(txHash common.Hash, addr common.Address)
}

// GetBestFittingTx gets the most efficient tx that fits in the available batch resources
func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters) (*TxTracker, error) {
func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters, isFistL2BlockAndEmpty bool) (*TxTracker, []*TxTracker, error) {
w.workerMutex.Lock()
defer w.workerMutex.Unlock()

Expand All @@ -417,7 +417,7 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR
w.reorgedTxs = w.reorgedTxs[1:]
if addrQueue, found := w.pool[reorgedTx.FromStr]; found {
if addrQueue.readyTx != nil && addrQueue.readyTx.Hash == reorgedTx.Hash {
return reorgedTx, nil
return reorgedTx, nil, nil
} else {
log.Warnf("reorged tx %s is not the ready tx for addrQueue %s, this shouldn't happen", reorgedTx.Hash, reorgedTx.From)
}
Expand All @@ -427,12 +427,14 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR
}

if w.txSortedList.len() == 0 {
return nil, ErrTransactionsListEmpty
return nil, nil, ErrTransactionsListEmpty
}

var (
tx *TxTracker
foundMutex sync.RWMutex
tx *TxTracker
foundMutex sync.RWMutex
oocTxs []*TxTracker
oocTxsMutex sync.Mutex
)

nGoRoutines := runtime.NumCPU()
Expand All @@ -457,7 +459,14 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR
needed, _ := getNeededZKCounters(highReservedCounters, txCandidate.UsedZKCounters, txCandidate.ReservedZKCounters)
fits, _ := bresources.Fits(state.BatchResources{ZKCounters: needed, Bytes: txCandidate.Bytes})
if !fits {
// We don't add this Tx
// If we are looking for a tx for the first empty L2 block in the batch and this tx doesn't fits in the batch, then this tx will never fit in any batch.
// We add the tx to the oocTxs slice. That slice will be returned to set these txs as invalid (and delete them from the worker) from the finalizer code
if isFistL2BlockAndEmpty {
oocTxsMutex.Lock()
oocTxs = append(oocTxs, txCandidate)
oocTxsMutex.Unlock()
}
// We continue looking for a tx that fits in the batch
continue
}

Expand All @@ -477,9 +486,15 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR
if foundAt != -1 {
log.Debugf("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice)
w.wipTx = tx
return tx, nil
return tx, oocTxs, nil
} else {
return nil, ErrNoFittingTransaction
// If the length of the oocTxs slice is equal to the length of the txSortedList this means that all the txs are ooc,
// therefore we need to return an error indicating that the list is empty
if w.txSortedList.len() == len(oocTxs) {
return nil, oocTxs, ErrTransactionsListEmpty
} else {
return nil, oocTxs, ErrNoFittingTransaction
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion sequencer/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestWorkerGetBestTx(t *testing.T) {
ct := 0

for {
tx, _ := worker.GetBestFittingTx(rc, state.ZKCounters{})
tx, _, _ := worker.GetBestFittingTx(rc, state.ZKCounters{}, true)
if tx != nil {
if ct >= len(expectedGetBestTx) {
t.Fatalf("Error getting more best tx than expected. Expected=%d, Actual=%d", len(expectedGetBestTx), ct+1)
Expand Down
51 changes: 40 additions & 11 deletions state/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"fmt"

"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/db"
)
Expand Down Expand Up @@ -71,15 +73,42 @@ type BatchConstraintsCfg struct {
MaxSHA256Hashes uint32 `mapstructure:"MaxSHA256Hashes"`
}

// IsWithinConstraints checks if the counters are within the batch constraints
func (c BatchConstraintsCfg) IsWithinConstraints(counters ZKCounters) bool {
return counters.GasUsed <= c.MaxCumulativeGasUsed &&
counters.KeccakHashes <= c.MaxKeccakHashes &&
counters.PoseidonHashes <= c.MaxPoseidonHashes &&
counters.PoseidonPaddings <= c.MaxPoseidonPaddings &&
counters.MemAligns <= c.MaxMemAligns &&
counters.Arithmetics <= c.MaxArithmetics &&
counters.Binaries <= c.MaxBinaries &&
counters.Steps <= c.MaxSteps &&
counters.Sha256Hashes_V2 <= c.MaxSHA256Hashes
// CheckNodeLevelOOC checks if the counters are within the batch constraints
func (c BatchConstraintsCfg) CheckNodeLevelOOC(counters ZKCounters) error {
oocList := ""

if counters.GasUsed > c.MaxCumulativeGasUsed {
oocList += "GasUsed, "
}
if counters.KeccakHashes > c.MaxKeccakHashes {
oocList += "KeccakHashes, "
}
if counters.PoseidonHashes > c.MaxPoseidonHashes {
oocList += "PoseidonHashes, "
}
if counters.PoseidonPaddings > c.MaxPoseidonPaddings {
oocList += "PoseidonPaddings, "
}
if counters.MemAligns > c.MaxMemAligns {
oocList += "MemAligns, "
}
if counters.Arithmetics > c.MaxArithmetics {
oocList += "Arithmetics, "
}
if counters.Binaries > c.MaxBinaries {
oocList += "Binaries, "
}
if counters.Steps > c.MaxSteps {
oocList += "Steps, "
}
if counters.Sha256Hashes_V2 > c.MaxSHA256Hashes {
oocList += "Sha256Hashes, "
}

if oocList != "" {
oocList = oocList[:len(oocList)-2] // Remove last comma and blank space
return fmt.Errorf("out of counters at node level (%s)", oocList)
}

return nil
}

0 comments on commit adf526c

Please sign in to comment.