From ffc61995dbc800e6f2d1e188fd1273ab06bf46e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Mon, 7 Aug 2023 10:36:59 +0200 Subject: [PATCH 1/3] refactor nonce --- sequencer/finalizer.go | 59 ++++++------------------------------- sequencer/finalizer_test.go | 12 +++++--- sequencer/sequencer.go | 13 ++------ sequencer/worker.go | 43 ++++++++------------------- sequencer/worker_test.go | 5 +--- 5 files changed, 33 insertions(+), 99 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 4bcdfa97ab..529c2c91d7 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -60,15 +60,14 @@ type finalizer struct { maxBreakEvenGasPriceDeviationPercentage *big.Int defaultMinGasPriceAllowed uint64 // Processed txs - pendingTxsToStore chan transactionToStore - pendingTxsToStoreWG *sync.WaitGroup - pendingTxsToStoreMux *sync.RWMutex - pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker - storedFlushID uint64 - storedFlushIDCond *sync.Cond - proverID string - lastPendingFlushID uint64 - pendingFlushIDChan chan uint64 + pendingTxsToStore chan transactionToStore + pendingTxsToStoreWG *sync.WaitGroup + pendingTxsToStoreMux *sync.RWMutex + storedFlushID uint64 + storedFlushIDCond *sync.Cond + proverID string + lastPendingFlushID uint64 + pendingFlushIDChan chan uint64 } type transactionToStore struct { @@ -114,8 +113,6 @@ func newFinalizer( closingSignalCh ClosingSignalCh, batchConstraints batchConstraints, eventLog *event.EventLog, - pendingTxsToStoreMux *sync.RWMutex, - pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker, ) *finalizer { return &finalizer{ cfg: cfg, @@ -144,8 +141,7 @@ func newFinalizer( maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage), pendingTxsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), pendingTxsToStoreWG: new(sync.WaitGroup), - pendingTxsToStoreMux: pendingTxsToStoreMux, - pendingTxsPerAddressTrackers: pendingTxsPerAddressTrackers, + pendingTxsToStoreMux: &sync.RWMutex{}, storedFlushID: 0, // Mutex is unlocked when the condition is broadcasted storedFlushIDCond: sync.NewCond(&sync.Mutex{}), @@ -431,15 +427,6 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) { log.Infof("updating pending transaction trackers for transaction hash: %s, flush Id: %d ...", tx.txTracker.Hash.String(), tx.flushId) f.pendingTxsToStoreMux.Lock() f.pendingTxsToStoreWG.Done() - f.pendingTxsPerAddressTrackers[tx.txTracker.From].wg.Done() - f.pendingTxsPerAddressTrackers[tx.txTracker.From].count-- - log.Infof("updated pending transaction tracker for address: %s, count: %d, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), f.pendingTxsPerAddressTrackers[tx.txTracker.From].count, tx.txTracker.Hash.String(), tx.flushId) - // Needed to avoid memory leaks - if f.pendingTxsPerAddressTrackers[tx.txTracker.From].count == 0 { - log.Infof("deleting pending transaction tracker for address: %s, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), tx.txTracker.Hash.String(), tx.flushId) - delete(f.pendingTxsPerAddressTrackers, tx.txTracker.From) - } - f.pendingTxsToStoreMux.Unlock() case <-ctx.Done(): // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit f.pendingTxsToStoreWG.Wait() @@ -702,21 +689,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx log.Infof("adding tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", result.Responses[0].TxHash, f.batch.batchNumber, result.FlushID) f.pendingTxsToStoreMux.Lock() - // global tracker f.pendingTxsToStoreWG.Add(1) - // per address tracker - if _, ok := f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From]; !ok { - f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From] = new(pendingTxPerAddressTracker) - f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].wg = &sync.WaitGroup{} - } - f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].wg.Add(1) - f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].count++ - // broadcast the new flushID if it's greater than the last one - if result.FlushID > f.lastPendingFlushID { - log.Infof("broadcasting new pending flushId: %d", result.FlushID) - f.lastPendingFlushID = result.FlushID - f.pendingFlushIDChan <- result.FlushID - } f.pendingTxsToStoreMux.Unlock() log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber) select { @@ -771,21 +744,7 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat log.Infof("adding forced tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", txResp.TxHash, request.BatchNumber, result.FlushID) f.pendingTxsToStoreMux.Lock() - // global tracker f.pendingTxsToStoreWG.Add(1) - // per address tracker - if _, ok := f.pendingTxsPerAddressTrackers[from]; !ok { - f.pendingTxsPerAddressTrackers[from] = new(pendingTxPerAddressTracker) - f.pendingTxsPerAddressTrackers[from].wg = &sync.WaitGroup{} - } - f.pendingTxsPerAddressTrackers[from].wg.Add(1) - f.pendingTxsPerAddressTrackers[from].count++ - // broadcast the new flushID if it's greater than the last one - if result.FlushID > f.lastPendingFlushID { - log.Infof("broadcasting new pending flushId: %d", result.FlushID) - f.lastPendingFlushID = result.FlushID - f.pendingFlushIDChan <- result.FlushID - } f.pendingTxsToStoreMux.Unlock() oldStateRoot = txResp.StateRoot diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 0bfca189cb..28fa73482d 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -118,9 +118,7 @@ func TestNewFinalizer(t *testing.T) { dbManagerMock.On("GetLastSentFlushID", context.Background()).Return(uint64(0), nil) // arrange and act - pendingTxsToStoreMux := new(sync.RWMutex) - pendingTxsPerAddressTrackers := make(map[common.Address]*pendingTxPerAddressTracker) - f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog, pendingTxsToStoreMux, pendingTxsPerAddressTrackers) + f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog) // assert assert.NotNil(t, f) @@ -1727,6 +1725,13 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { f.pendingTxsToStoreWG.Wait() require.Nil(t, err) require.Equal(t, len(tc.expectedStoredTxs), len(storedTxs)) + // Tx tracker is not comparable, so we can't compare the whole struct + /* + for i := 0; i < len(tc.expectedStoredTxs); i++ { + expectedTx := tc.expectedStoredTxs[i] + actualTx := storedTxs[i] + require.Equal(t, expectedTx, actualTx) + }*/ }) } } @@ -2482,7 +2487,6 @@ func setupFinalizer(withWipBatch bool) *finalizer { pendingTxsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), pendingTxsToStoreWG: new(sync.WaitGroup), pendingTxsToStoreMux: new(sync.RWMutex), - pendingTxsPerAddressTrackers: make(map[common.Address]*pendingTxPerAddressTracker), storedFlushID: 0, storedFlushIDCond: sync.NewCond(new(sync.Mutex)), proverID: "", diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index 49f285eb47..ef41799b03 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/0xPolygonHermez/zkevm-node/event" @@ -68,12 +67,6 @@ type ClosingSignalCh struct { L2ReorgCh chan L2ReorgEvent } -// pendingTxPerAddressTracker is a struct that tracks the number of pending transactions per address -type pendingTxPerAddressTracker struct { - wg *sync.WaitGroup - count uint -} - // New init sequencer func New(cfg Config, txPool txPool, state stateInterface, etherman etherman, manager ethTxManager, eventLog *event.EventLog) (*Sequencer, error) { addr, err := etherman.TrustedSequencer() @@ -134,14 +127,12 @@ func (s *Sequencer) Start(ctx context.Context) { if err != nil { log.Fatalf("failed to mark WIP txs as pending, err: %v", err) } - pendingTxsToStoreMux := new(sync.RWMutex) - pendingTxTrackerPerAddress := make(map[common.Address]*pendingTxPerAddressTracker) - worker := NewWorker(s.cfg.Worker, s.state, batchConstraints, batchResourceWeights, pendingTxsToStoreMux, pendingTxTrackerPerAddress) + worker := NewWorker(s.cfg.Worker, s.state, batchConstraints, batchResourceWeights) dbManager := newDBManager(ctx, s.cfg.DBManager, s.pool, s.state, worker, closingSignalCh, batchConstraints) go dbManager.Start() - finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog, pendingTxsToStoreMux, pendingTxTrackerPerAddress) + finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog) currBatch, processingReq := s.bootstrap(ctx, dbManager, finalizer) go finalizer.Start(ctx, currBatch, processingReq) diff --git a/sequencer/worker.go b/sequencer/worker.go index 6e96f05349..da5f6e1a5a 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -16,15 +16,13 @@ import ( // Worker represents the worker component of the sequencer type Worker struct { - cfg WorkerCfg - pool map[string]*addrQueue - efficiencyList *efficiencyList - workerMutex sync.Mutex - state stateInterface - batchConstraints batchConstraintsFloat64 - batchResourceWeights batchResourceWeights - pendingTxsToStoreMux *sync.RWMutex - pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker + cfg WorkerCfg + pool map[string]*addrQueue + efficiencyList *efficiencyList + workerMutex sync.Mutex + state stateInterface + batchConstraints batchConstraintsFloat64 + batchResourceWeights batchResourceWeights } // NewWorker creates an init a worker @@ -33,18 +31,14 @@ func NewWorker( state stateInterface, constraints batchConstraints, weights batchResourceWeights, - pendingTxsToStoreMux *sync.RWMutex, - pendingTxTrackersPerAddress map[common.Address]*pendingTxPerAddressTracker, ) *Worker { w := Worker{ - cfg: cfg, - pool: make(map[string]*addrQueue), - efficiencyList: newEfficiencyList(), - state: state, - batchConstraints: convertBatchConstraintsToFloat64(constraints), - batchResourceWeights: weights, - pendingTxsToStoreMux: pendingTxsToStoreMux, - pendingTxsPerAddressTrackers: pendingTxTrackersPerAddress, + cfg: cfg, + pool: make(map[string]*addrQueue), + efficiencyList: newEfficiencyList(), + state: state, + batchConstraints: convertBatchConstraintsToFloat64(constraints), + batchResourceWeights: weights, } return &w @@ -66,17 +60,6 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T // Unlock the worker to let execute other worker functions while creating the new AddrQueue w.workerMutex.Unlock() - // Wait until all pending transactions are stored, so we can ensure getting the correct nonce and balance of the new AddrQueue - log.Infof("Checking for pending transactions to be stored before creating new AddrQueue for address %s", tx.FromStr) - w.pendingTxsToStoreMux.RLock() - pendingTxsTracker, ok := w.pendingTxsPerAddressTrackers[tx.From] - if ok && pendingTxsTracker.wg != nil { - log.Infof("Waiting for pending transactions to be stored before creating new AddrQueue for address %s", tx.FromStr) - pendingTxsTracker.wg.Wait() - log.Infof("Finished waiting for pending transactions to be stored before creating new AddrQueue for address %s", tx.FromStr) - } - w.pendingTxsToStoreMux.RUnlock() - root, err := w.state.GetLastStateRoot(ctx, nil) if err != nil { dropReason = fmt.Errorf("AddTx GetLastStateRoot error: %v", err) diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 1d254984f5..526b72bae3 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/big" - "sync" "testing" "github.com/0xPolygonHermez/zkevm-node/state" @@ -308,9 +307,7 @@ func TestWorkerGetBestTx(t *testing.T) { } func initWorker(stateMock *StateMock, rcMax batchConstraints, rcWeigth batchResourceWeights) *Worker { - pendingTxsToStoreMux := new(sync.RWMutex) - pendingTxsPerAddressTrackers := make(map[common.Address]*pendingTxPerAddressTracker) - worker := NewWorker(workerCfg, stateMock, rcMax, rcWeigth, pendingTxsToStoreMux, pendingTxsPerAddressTrackers) + worker := NewWorker(workerCfg, stateMock, rcMax, rcWeigth) return worker } From 691b19fc3381a0d2a687acb876526e4ec19b9fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Mon, 7 Aug 2023 11:39:45 +0200 Subject: [PATCH 2/3] fix --- sequencer/finalizer.go | 18 ++++++++++++++---- sequencer/finalizer_test.go | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 529c2c91d7..47cd1cf35e 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -67,7 +67,7 @@ type finalizer struct { storedFlushIDCond *sync.Cond proverID string lastPendingFlushID uint64 - pendingFlushIDChan chan uint64 + pendingFlushIDCond *sync.Cond } type transactionToStore struct { @@ -147,7 +147,7 @@ func newFinalizer( storedFlushIDCond: sync.NewCond(&sync.Mutex{}), proverID: "", lastPendingFlushID: 0, - pendingFlushIDChan: make(chan uint64, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingFlushIDCond: sync.NewCond(&sync.Mutex{}), } } @@ -188,12 +188,13 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) { for { log.Infof("checking for stored flush id to be less than last pending flush id ...") - + f.pendingFlushIDCond.L.Lock() for f.storedFlushID >= f.lastPendingFlushID { log.Infof("waiting for new pending flush id, last pending flush id: %v", f.lastPendingFlushID) - <-f.pendingFlushIDChan + f.pendingFlushIDCond.Wait() log.Infof("received new last pending flush id: %v", f.lastPendingFlushID) } + f.pendingFlushIDCond.L.Unlock() for f.storedFlushID < f.lastPendingFlushID { storedFlushID, proverID, err := f.dbManager.GetStoredFlushID(ctx) @@ -691,6 +692,11 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx f.pendingTxsToStoreMux.Lock() f.pendingTxsToStoreWG.Add(1) f.pendingTxsToStoreMux.Unlock() + if result.FlushID > f.lastPendingFlushID { + f.lastPendingFlushID = result.FlushID + f.pendingFlushIDCond.Broadcast() + } + log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber) select { case f.pendingTxsToStore <- processedTransaction: @@ -746,6 +752,10 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat f.pendingTxsToStoreMux.Lock() f.pendingTxsToStoreWG.Add(1) f.pendingTxsToStoreMux.Unlock() + if result.FlushID > f.lastPendingFlushID { + f.lastPendingFlushID = result.FlushID + f.pendingFlushIDCond.Broadcast() + } oldStateRoot = txResp.StateRoot log.Infof("sending forced tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", txResp.TxHash, request.BatchNumber) diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 28fa73482d..cffe8e7b64 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -2491,6 +2491,6 @@ func setupFinalizer(withWipBatch bool) *finalizer { storedFlushIDCond: sync.NewCond(new(sync.Mutex)), proverID: "", lastPendingFlushID: 0, - pendingFlushIDChan: make(chan uint64, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingFlushIDCond: sync.NewCond(new(sync.Mutex)), } } From 6b720b41135e1e2894167285f8e9f5e7ca8283ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= Date: Mon, 7 Aug 2023 11:59:32 +0200 Subject: [PATCH 3/3] fix --- sequencer/finalizer.go | 87 ++++++++++------------------ sequencer/finalizer_test.go | 109 ++++++++++++------------------------ sequencer/worker.go | 7 +-- sequencer/worker_test.go | 1 - 4 files changed, 65 insertions(+), 139 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 47cd1cf35e..52311e3bf5 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -60,14 +60,14 @@ type finalizer struct { maxBreakEvenGasPriceDeviationPercentage *big.Int defaultMinGasPriceAllowed uint64 // Processed txs - pendingTxsToStore chan transactionToStore - pendingTxsToStoreWG *sync.WaitGroup - pendingTxsToStoreMux *sync.RWMutex - storedFlushID uint64 - storedFlushIDCond *sync.Cond - proverID string - lastPendingFlushID uint64 - pendingFlushIDCond *sync.Cond + pendingTransactionsToStore chan transactionToStore + pendingTransactionsToStoreWG *sync.WaitGroup + pendingTransactionsToStoreMux *sync.RWMutex + storedFlushID uint64 + storedFlushIDCond *sync.Cond + proverID string + lastPendingFlushID uint64 + pendingFlushIDCond *sync.Cond } type transactionToStore struct { @@ -139,9 +139,9 @@ func newFinalizer( // event log eventLog: eventLog, maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage), - pendingTxsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), - pendingTxsToStoreWG: new(sync.WaitGroup), - pendingTxsToStoreMux: &sync.RWMutex{}, + pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingTransactionsToStoreWG: new(sync.WaitGroup), + pendingTransactionsToStoreMux: &sync.RWMutex{}, storedFlushID: 0, // Mutex is unlocked when the condition is broadcasted storedFlushIDCond: sync.NewCond(&sync.Mutex{}), @@ -187,12 +187,9 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s // updateProverIdAndFlushId updates the prover id and flush id func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) { for { - log.Infof("checking for stored flush id to be less than last pending flush id ...") f.pendingFlushIDCond.L.Lock() for f.storedFlushID >= f.lastPendingFlushID { - log.Infof("waiting for new pending flush id, last pending flush id: %v", f.lastPendingFlushID) f.pendingFlushIDCond.Wait() - log.Infof("received new last pending flush id: %v", f.lastPendingFlushID) } f.pendingFlushIDCond.L.Unlock() @@ -246,7 +243,6 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) { // updateStoredFlushID updates the stored flush id func (f *finalizer) updateStoredFlushID(newFlushID uint64) { - log.Infof("updating stored flush id to: %v", newFlushID) f.storedFlushIDCond.L.Lock() f.storedFlushID = newFlushID f.storedFlushIDCond.Broadcast() @@ -375,7 +371,6 @@ func (f *finalizer) halt(ctx context.Context, err error) { // checkProverIDAndUpdateStoredFlushID checks if the proverID changed and updates the stored flush id func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, proverID string) { - log.Infof("checking proverID: %s", proverID) if f.proverID != "" && f.proverID != proverID { event := &event.Event{ ReceivedAt: time.Now(), @@ -400,22 +395,18 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr func (f *finalizer) storePendingTransactions(ctx context.Context) { for { select { - case tx, ok := <-f.pendingTxsToStore: + case tx, ok := <-f.pendingTransactionsToStore: if !ok { - log.Infof("pendingTxsToStore channel is closed") + // Channel is closed return } - log.Infof("storing pending transaction hash: %s", tx.txTracker.Hash.String()) // Print the formatted timestamp f.storedFlushIDCond.L.Lock() for f.storedFlushID < tx.flushId { - log.Infof("waiting for FlushID: %d to be stored (confirmed) ...", tx.flushId) f.storedFlushIDCond.Wait() - log.Infof("waking up after FlushID: %d was stored (confirmed)", tx.flushId) // check if context is done after waking up if ctx.Err() != nil { - log.Errorf("context is done, err: %s", ctx.Err()) f.storedFlushIDCond.L.Unlock() return } @@ -424,13 +415,10 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) { // Now f.storedFlushID >= tx.flushId, you can store tx f.storeProcessedTx(ctx, tx) - - log.Infof("updating pending transaction trackers for transaction hash: %s, flush Id: %d ...", tx.txTracker.Hash.String(), tx.flushId) - f.pendingTxsToStoreMux.Lock() - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() case <-ctx.Done(): // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit - f.pendingTxsToStoreWG.Wait() + f.pendingTransactionsToStoreWG.Wait() return default: time.Sleep(100 * time.Millisecond) //nolint:gomnd @@ -445,14 +433,10 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { // Wait until all processed transactions are saved startWait := time.Now() - batchNumber := uint64(0) - if f.batch != nil { - batchNumber = f.batch.batchNumber - } - log.Infof("waiting for pending transactions to be stored batch number: %d ...", batchNumber) - f.pendingTxsToStoreWG.Wait() + f.pendingTransactionsToStoreWG.Wait() endWait := time.Now() - log.Infof("waiting for pending transactions for batch number: %d to be stored took: %s", batchNumber, endWait.Sub(startWait).String()) + + log.Info("waiting for pending transactions to be stored took: ", endWait.Sub(startWait).String()) var err error if f.batch.stateRoot == state.ZeroHash { @@ -688,21 +672,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx flushId: result.FlushID, } - log.Infof("adding tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", result.Responses[0].TxHash, f.batch.batchNumber, result.FlushID) - f.pendingTxsToStoreMux.Lock() - f.pendingTxsToStoreWG.Add(1) - f.pendingTxsToStoreMux.Unlock() + f.pendingTransactionsToStoreMux.Lock() + f.pendingTransactionsToStoreWG.Add(1) if result.FlushID > f.lastPendingFlushID { f.lastPendingFlushID = result.FlushID f.pendingFlushIDCond.Broadcast() } - - log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber) + f.pendingTransactionsToStoreMux.Unlock() select { - case f.pendingTxsToStore <- processedTransaction: + case f.pendingTransactionsToStore <- processedTransaction: case <-ctx.Done(): // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } f.batch.countOfTxs++ @@ -728,16 +709,8 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat } } - from, err := state.GetSender(txResp.Tx) - if err != nil { - log.Errorf("handleForcedTxsProcessResp: failed to get sender: %s", err) - continue - } - processedTransaction := transactionToStore{ - txTracker: &TxTracker{ - From: from, - }, + txTracker: nil, response: txResp, batchResponse: result, batchNumber: request.BatchNumber, @@ -748,22 +721,20 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat flushId: result.FlushID, } - log.Infof("adding forced tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", txResp.TxHash, request.BatchNumber, result.FlushID) - f.pendingTxsToStoreMux.Lock() - f.pendingTxsToStoreWG.Add(1) - f.pendingTxsToStoreMux.Unlock() + f.pendingTransactionsToStoreMux.Lock() + f.pendingTransactionsToStoreWG.Add(1) if result.FlushID > f.lastPendingFlushID { f.lastPendingFlushID = result.FlushID f.pendingFlushIDCond.Broadcast() } + f.pendingTransactionsToStoreMux.Unlock() oldStateRoot = txResp.StateRoot - log.Infof("sending forced tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", txResp.TxHash, request.BatchNumber) select { - case f.pendingTxsToStore <- processedTransaction: + case f.pendingTransactionsToStore <- processedTransaction: case <-ctx.Done(): // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } } } diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index cffe8e7b64..74751f453c 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -268,14 +268,14 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTxsToStore = make(chan transactionToStore) + f.pendingTransactionsToStore = make(chan transactionToStore) if tc.expectedStoredTx.batchResponse != nil { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTxsToStore { + for tx := range f.pendingTransactionsToStore { storedTxs = append(storedTxs, tx) - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -311,9 +311,9 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) { } if tc.expectedStoredTx.batchResponse != nil { - close(f.pendingTxsToStore) // close the channel - <-done // wait for the goroutine to finish - f.pendingTxsToStoreWG.Wait() + close(f.pendingTransactionsToStore) // close the channel + <-done // wait for the goroutine to finish + f.pendingTransactionsToStoreWG.Wait() require.Len(t, storedTxs, 1) actualTx := storedTxs[0] assertEqualTransactionToStore(t, tc.expectedStoredTx, actualTx) @@ -773,19 +773,15 @@ func TestFinalizer_processForcedBatches(t *testing.T) { batchNumber := f.batch.batchNumber decodedBatchL2Data, err = hex.DecodeHex(testBatchL2DataAsString) require.NoError(t, err) - encodedTxs, _, _, err := state.DecodeTxs(decodedBatchL2Data, forkId5) - require.NoError(t, err) txResp1 := &state.ProcessTransactionResponse{ TxHash: txHash, StateRoot: stateRootHashes[0], - Tx: encodedTxs[0], } txResp2 := &state.ProcessTransactionResponse{ TxHash: txHash2, StateRoot: stateRootHashes[1], - Tx: encodedTxs[0], } batchResponse1 := &state.ProcessBatchResponse{ NewBatchNumber: f.batch.batchNumber + 1, @@ -822,9 +818,6 @@ func TestFinalizer_processForcedBatches(t *testing.T) { forcedBatches: []state.ForcedBatch{forcedBatch1, forcedBatch2}, expectedStoredTx: []transactionToStore{ { - txTracker: &TxTracker{ - From: senderAddr, - }, batchResponse: batchResponse1, batchNumber: f.batch.batchNumber + 1, coinbase: seqAddr, @@ -834,9 +827,6 @@ func TestFinalizer_processForcedBatches(t *testing.T) { response: txResp1, }, { - txTracker: &TxTracker{ - From: senderAddr, - }, batchResponse: batchResponse2, batchNumber: f.batch.batchNumber + 2, coinbase: seqAddr, @@ -865,9 +855,6 @@ func TestFinalizer_processForcedBatches(t *testing.T) { }, expectedStoredTx: []transactionToStore{ { - txTracker: &TxTracker{ - From: senderAddr, - }, batchResponse: batchResponse1, batchNumber: f.batch.batchNumber + 1, coinbase: seqAddr, @@ -877,9 +864,6 @@ func TestFinalizer_processForcedBatches(t *testing.T) { response: txResp1, }, { - txTracker: &TxTracker{ - From: senderAddr, - }, batchResponse: batchResponse2, batchNumber: f.batch.batchNumber + 2, coinbase: seqAddr, @@ -909,13 +893,13 @@ func TestFinalizer_processForcedBatches(t *testing.T) { var newStateRoot common.Hash stateRoot := oldHash storedTxs := make([]transactionToStore, 0) - f.pendingTxsToStore = make(chan transactionToStore) + f.pendingTransactionsToStore = make(chan transactionToStore) if tc.expectedStoredTx != nil && len(tc.expectedStoredTx) > 0 { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTxsToStore { + for tx := range f.pendingTransactionsToStore { storedTxs = append(storedTxs, tx) - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -973,10 +957,12 @@ func TestFinalizer_processForcedBatches(t *testing.T) { assert.EqualError(t, err, tc.expectedErr.Error()) } else { if tc.expectedStoredTx != nil && len(tc.expectedStoredTx) > 0 { - close(f.pendingTxsToStore) // ensure the channel is closed - <-done // wait for the goroutine to finish - f.pendingTxsToStoreWG.Wait() - assert.Equal(t, len(tc.expectedStoredTx), len(storedTxs)) + close(f.pendingTransactionsToStore) // ensure the channel is closed + <-done // wait for the goroutine to finish + f.pendingTransactionsToStoreWG.Wait() + for i := range tc.expectedStoredTx { + require.Equal(t, tc.expectedStoredTx[i], storedTxs[i]) + } } if len(tc.expectedStoredTx) > 0 { assert.Equal(t, stateRootHashes[len(stateRootHashes)-1], newStateRoot) @@ -1508,13 +1494,13 @@ func Test_processTransaction(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTxsToStore = make(chan transactionToStore, 1) + f.pendingTransactionsToStore = make(chan transactionToStore, 1) if tc.expectedStoredTx.batchResponse != nil { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTxsToStore { + for tx := range f.pendingTransactionsToStore { storedTxs = append(storedTxs, tx) - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -1537,9 +1523,9 @@ func Test_processTransaction(t *testing.T) { errWg, err := f.processTransaction(tc.ctx, tc.tx) if tc.expectedStoredTx.batchResponse != nil { - close(f.pendingTxsToStore) // ensure the channel is closed - <-done // wait for the goroutine to finish - f.pendingTxsToStoreWG.Wait() + close(f.pendingTransactionsToStore) // ensure the channel is closed + <-done // wait for the goroutine to finish + f.pendingTransactionsToStoreWG.Wait() require.Equal(t, tc.expectedStoredTx, storedTxs[0]) } if tc.expectedErr != nil { @@ -1563,21 +1549,17 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { defer func() { now = time.Now }() - decodedBatchL2Data, err = hex.DecodeHex(testBatchL2DataAsString) - require.NoError(t, err) - encodedTxs, _, _, err := state.DecodeTxs(decodedBatchL2Data, forkId5) + ctx = context.Background() txResponseOne := &state.ProcessTransactionResponse{ TxHash: txHash, StateRoot: newHash, RomError: nil, - Tx: encodedTxs[0], } txResponseTwo := &state.ProcessTransactionResponse{ TxHash: common.HexToHash("0x02"), StateRoot: newHash2, RomError: nil, - Tx: encodedTxs[0], } successfulBatchResp := &state.ProcessBatchResponse{ NewStateRoot: newHash, @@ -1590,7 +1572,6 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { TxHash: txHash, RomError: runtime.ErrExecutionReverted, StateRoot: newHash, - Tx: encodedTxs[0], } revertedBatchResp := &state.ProcessBatchResponse{ Responses: []*state.ProcessTransactionResponse{ @@ -1601,7 +1582,6 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { TxHash: txHash, RomError: runtime.ErrIntrinsicInvalidChainID, StateRoot: newHash, - Tx: encodedTxs[0], } intrinsicErrBatchResp := &state.ProcessBatchResponse{ NewStateRoot: newHash, @@ -1630,9 +1610,7 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { oldStateRoot: oldHash, expectedStoredTxs: []transactionToStore{ { - txTracker: &TxTracker{ - From: senderAddr, - }, + batchNumber: 1, coinbase: seqAddr, timestamp: now(), @@ -1642,9 +1620,6 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { batchResponse: successfulBatchResp, }, { - txTracker: &TxTracker{ - From: senderAddr, - }, batchNumber: 1, coinbase: seqAddr, timestamp: now(), @@ -1667,9 +1642,6 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { oldStateRoot: oldHash, expectedStoredTxs: []transactionToStore{ { - txTracker: &TxTracker{ - From: senderAddr, - }, batchNumber: 1, coinbase: seqAddr, timestamp: now(), @@ -1692,9 +1664,6 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { oldStateRoot: oldHash, expectedStoredTxs: []transactionToStore{ { - txTracker: &TxTracker{ - From: senderAddr, - }, batchNumber: 1, coinbase: seqAddr, timestamp: now(), @@ -1710,28 +1679,26 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTxsToStore = make(chan transactionToStore) + f.pendingTransactionsToStore = make(chan transactionToStore) // Mock storeProcessedTx to store txs into the storedTxs slice go func() { - for tx := range f.pendingTxsToStore { + for tx := range f.pendingTransactionsToStore { storedTxs = append(storedTxs, tx) - f.pendingTxsToStoreWG.Done() + f.pendingTransactionsToStoreWG.Done() } }() f.handleForcedTxsProcessResp(ctx, tc.request, tc.result, tc.oldStateRoot) - f.pendingTxsToStoreWG.Wait() + f.pendingTransactionsToStoreWG.Wait() require.Nil(t, err) require.Equal(t, len(tc.expectedStoredTxs), len(storedTxs)) - // Tx tracker is not comparable, so we can't compare the whole struct - /* - for i := 0; i < len(tc.expectedStoredTxs); i++ { - expectedTx := tc.expectedStoredTxs[i] - actualTx := storedTxs[i] - require.Equal(t, expectedTx, actualTx) - }*/ + for i := 0; i < len(tc.expectedStoredTxs); i++ { + expectedTx := tc.expectedStoredTxs[i] + actualTx := storedTxs[i] + require.Equal(t, expectedTx, actualTx) + } }) } } @@ -1766,9 +1733,6 @@ func TestFinalizer_storeProcessedTx(t *testing.T) { response: &state.ProcessTransactionResponse{ TxHash: txHash, }, - txTracker: &TxTracker{ - From: senderAddr, - }, isForcedBatch: false, }, }, @@ -1791,9 +1755,6 @@ func TestFinalizer_storeProcessedTx(t *testing.T) { TxHash: txHash2, }, isForcedBatch: true, - txTracker: &TxTracker{ - From: senderAddr, - }, }, }, } @@ -2484,9 +2445,9 @@ func setupFinalizer(withWipBatch bool) *finalizer { handlingL2Reorg: false, eventLog: eventLog, maxBreakEvenGasPriceDeviationPercentage: big.NewInt(10), - pendingTxsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), - pendingTxsToStoreWG: new(sync.WaitGroup), - pendingTxsToStoreMux: new(sync.RWMutex), + pendingTransactionsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingTransactionsToStoreWG: new(sync.WaitGroup), + pendingTransactionsToStoreMux: new(sync.RWMutex), storedFlushID: 0, storedFlushIDCond: sync.NewCond(new(sync.Mutex)), proverID: "", diff --git a/sequencer/worker.go b/sequencer/worker.go index da5f6e1a5a..cba6938f7c 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -26,12 +26,7 @@ type Worker struct { } // NewWorker creates an init a worker -func NewWorker( - cfg WorkerCfg, - state stateInterface, - constraints batchConstraints, - weights batchResourceWeights, -) *Worker { +func NewWorker(cfg WorkerCfg, state stateInterface, constraints batchConstraints, weights batchResourceWeights) *Worker { w := Worker{ cfg: cfg, pool: make(map[string]*addrQueue), diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 526b72bae3..51787f188c 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -308,6 +308,5 @@ func TestWorkerGetBestTx(t *testing.T) { func initWorker(stateMock *StateMock, rcMax batchConstraints, rcWeigth batchResourceWeights) *Worker { worker := NewWorker(workerCfg, stateMock, rcMax, rcWeigth) - return worker }