From 2d0cf3c58a035134522ac2d94f516578a13b16e1 Mon Sep 17 00:00:00 2001 From: agnusmor Date: Tue, 8 Aug 2023 23:00:58 +0200 Subject: [PATCH 1/4] refactor delete addrQueue only if not pending txs to store --- sequencer/addrqueue.go | 44 ++++++--- sequencer/finalizer.go | 175 +++++++++++++++++++----------------- sequencer/finalizer_test.go | 1 - sequencer/interfaces.go | 2 + sequencer/mock_worker.go | 10 +++ sequencer/worker.go | 41 +++++++-- 6 files changed, 169 insertions(+), 104 deletions(-) diff --git a/sequencer/addrqueue.go b/sequencer/addrqueue.go index 684a060aed..daddc83e26 100644 --- a/sequencer/addrqueue.go +++ b/sequencer/addrqueue.go @@ -12,23 +12,25 @@ import ( // addrQueue is a struct that stores the ready and notReady txs for a specific from address type addrQueue struct { - from common.Address - fromStr string - currentNonce uint64 - currentBalance *big.Int - readyTx *TxTracker - notReadyTxs map[uint64]*TxTracker + from common.Address + fromStr string + currentNonce uint64 + currentBalance *big.Int + readyTx *TxTracker + notReadyTxs map[uint64]*TxTracker + pendingTxsToStore map[common.Hash]struct{} } // newAddrQueue creates and init a addrQueue func newAddrQueue(addr common.Address, nonce uint64, balance *big.Int) *addrQueue { return &addrQueue{ - from: addr, - fromStr: addr.String(), - currentNonce: nonce, - currentBalance: balance, - readyTx: nil, - notReadyTxs: make(map[uint64]*TxTracker), + from: addr, + fromStr: addr.String(), + currentNonce: nonce, + currentBalance: balance, + readyTx: nil, + notReadyTxs: make(map[uint64]*TxTracker), + pendingTxsToStore: make(map[common.Hash]struct{}), } } @@ -76,6 +78,11 @@ func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, replacedTx *T } } +// addPendingTxToStore adds a tx to the list of pending txs to store in the DB (trusted state) +func (a *addrQueue) addPendingTxToStore(txHash common.Hash) { + a.pendingTxsToStore[txHash] = struct{}{} +} + // ExpireTransactions removes the txs that have been in the queue for more than maxTime func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *TxTracker) { var ( @@ -95,7 +102,7 @@ func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *Tx prevReadyTx = a.readyTx txs = append(txs, a.readyTx) a.readyTx = nil - log.Debugf("Deleting notReadyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr) + log.Debugf("Deleting eadyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr) } return txs, prevReadyTx @@ -103,7 +110,7 @@ func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *Tx // IsEmpty returns true if the addrQueue is empty func (a *addrQueue) IsEmpty() bool { - return a.readyTx == nil && len(a.notReadyTxs) == 0 + return a.readyTx == nil && len(a.notReadyTxs) == 0 && len(a.pendingTxsToStore) == 0 } // deleteTx deletes the tx from the addrQueue @@ -126,6 +133,15 @@ func (a *addrQueue) deleteTx(txHash common.Hash) (deletedReadyTx *TxTracker) { } } +// deletePendingTxToStore delete a tx from the list of pending txs to store in the DB (trusted state) +func (a *addrQueue) deletePendingTxToStore(txHash common.Hash) { + if _, found := a.pendingTxsToStore[txHash]; found { + delete(a.pendingTxsToStore, txHash) + } else { + log.Warnf("tx (%s) not found in pendingTxsToStore list", txHash.String()) + } +} + // updateCurrentNonceBalance updates the nonce and balance of the addrQueue and updates the ready and notReady txs func (a *addrQueue) updateCurrentNonceBalance(nonce *uint64, balance *big.Int) (newReadyTx, prevReadyTx *TxTracker, toDelete []*TxTracker) { var oldReadyTx *TxTracker = nil diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 52311e3bf5..7de5203ccd 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -60,14 +60,13 @@ type finalizer struct { maxBreakEvenGasPriceDeviationPercentage *big.Int defaultMinGasPriceAllowed uint64 // Processed txs - pendingTransactionsToStore chan transactionToStore - pendingTransactionsToStoreWG *sync.WaitGroup - pendingTransactionsToStoreMux *sync.RWMutex - storedFlushID uint64 - storedFlushIDCond *sync.Cond - proverID string - lastPendingFlushID uint64 - pendingFlushIDCond *sync.Cond + pendingTransactionsToStore chan transactionToStore + pendingTransactionsToStoreWG *sync.WaitGroup + storedFlushID uint64 + storedFlushIDCond *sync.Cond //Condition to wait until storedFlushID has been updated + proverID string + lastPendingFlushID uint64 + pendingFlushIDCond *sync.Cond } type transactionToStore struct { @@ -141,7 +140,6 @@ func newFinalizer( maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage), pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), pendingTransactionsToStoreWG: new(sync.WaitGroup), - pendingTransactionsToStoreMux: &sync.RWMutex{}, storedFlushID: 0, // Mutex is unlocked when the condition is broadcasted storedFlushIDCond: sync.NewCond(&sync.Mutex{}), @@ -184,10 +182,51 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s f.finalizeBatches(ctx) } +// storePendingTransactions stores the pending transactions in the database +func (f *finalizer) storePendingTransactions(ctx context.Context) { + for { + select { + case tx, ok := <-f.pendingTransactionsToStore: + if !ok { + // Channel is closed + return + } + + // Wait until f.storedFlushID >= tx.flushId + f.storedFlushIDCond.L.Lock() + for f.storedFlushID < tx.flushId { + f.storedFlushIDCond.Wait() + // check if context is done after waking up + if ctx.Err() != nil { + f.storedFlushIDCond.L.Unlock() + return + } + } + f.storedFlushIDCond.L.Unlock() + + // Now f.storedFlushID >= tx.flushId, we can store tx + f.storeProcessedTx(ctx, tx) + + // Delete the txTracker from the pending list in the worker (addrQueue) + f.worker.DeletePendingTxToStore(tx.txTracker.Hash, tx.txTracker.From) + + f.pendingTransactionsToStoreWG.Done() + case <-ctx.Done(): + // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit + f.pendingTransactionsToStoreWG.Wait() + return + default: + time.Sleep(100 * time.Millisecond) //nolint:gomnd + } + } +} + // updateProverIdAndFlushId updates the prover id and flush id func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) { for { f.pendingFlushIDCond.L.Lock() + // f.storedFlushID is >= than f.lastPendingFlushID, this means all pending txs (flushid) are stored by the executor. + // We are "synced" with the flush id, therefore we need to wait for new tx (new pending flush id to be stored by the executor) for f.storedFlushID >= f.lastPendingFlushID { f.pendingFlushIDCond.Wait() } @@ -199,7 +238,14 @@ func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) { log.Errorf("failed to get stored flush id, Err: %v", err) } else { if storedFlushID != f.storedFlushID { - f.checkProverIDAndUpdateStoredFlushID(storedFlushID, proverID) + // Check if prover/Executor has been restarted + f.checkIfProverRestarted(proverID) + + // Update f.storeFlushID and signal condition f.storedFlushIDCond + f.storedFlushIDCond.L.Lock() + f.storedFlushID = storedFlushID + f.storedFlushIDCond.Broadcast() + f.storedFlushIDCond.L.Unlock() } } } @@ -241,12 +287,31 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) { } } -// updateStoredFlushID updates the stored flush id -func (f *finalizer) updateStoredFlushID(newFlushID uint64) { - f.storedFlushIDCond.L.Lock() - f.storedFlushID = newFlushID - f.storedFlushIDCond.Broadcast() - f.storedFlushIDCond.L.Unlock() +// updateLastPendingFLushID updates f.lastPendingFLushID with newFlushID value (it it has changed) and sends +// the signal condition f.pendingFlushIDCond to notify other go funcs that the f.lastPendingFlushID value has changed +func (f *finalizer) updateLastPendingFlushID(newFlushID uint64) { + if newFlushID > f.lastPendingFlushID { + f.lastPendingFlushID = newFlushID + f.pendingFlushIDCond.Broadcast() + } +} + +// addPendingTxToStore adds a pending tx that is ready to be stored in the state DB once its flushid has been stored by the executor +func (f *finalizer) addPendingTxToStore(ctx context.Context, txToStore transactionToStore) { + f.pendingTransactionsToStoreWG.Add(1) + if txToStore.txTracker != nil { + f.worker.AddPendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From) + } + select { + case f.pendingTransactionsToStore <- txToStore: + case <-ctx.Done(): + // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count and + // delete the pending TxToStore added in the worker + f.pendingTransactionsToStoreWG.Done() + if txToStore.txTracker != nil { + f.worker.DeletePendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From) + } + } } // finalizeBatches runs the endless loop for processing transactions finalizing batches. @@ -370,7 +435,7 @@ func (f *finalizer) halt(ctx context.Context, err error) { } // checkProverIDAndUpdateStoredFlushID checks if the proverID changed and updates the stored flush id -func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, proverID string) { +func (f *finalizer) checkIfProverRestarted(proverID string) { if f.proverID != "" && f.proverID != proverID { event := &event.Event{ ReceivedAt: time.Now(), @@ -388,42 +453,6 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr log.Fatal("restarting sequencer to discard current WIP batch and work with new executor") } - f.updateStoredFlushID(storedFlushID) -} - -// storePendingTransactions stores the pending transactions in the database -func (f *finalizer) storePendingTransactions(ctx context.Context) { - for { - select { - case tx, ok := <-f.pendingTransactionsToStore: - if !ok { - // Channel is closed - return - } - - // Print the formatted timestamp - f.storedFlushIDCond.L.Lock() - for f.storedFlushID < tx.flushId { - f.storedFlushIDCond.Wait() - // check if context is done after waking up - if ctx.Err() != nil { - f.storedFlushIDCond.L.Unlock() - return - } - } - f.storedFlushIDCond.L.Unlock() - - // Now f.storedFlushID >= tx.flushId, you can store tx - f.storeProcessedTx(ctx, tx) - f.pendingTransactionsToStoreWG.Done() - case <-ctx.Done(): - // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit - f.pendingTransactionsToStoreWG.Wait() - return - default: - time.Sleep(100 * time.Millisecond) //nolint:gomnd - } - } } // newWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch @@ -655,12 +684,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx if diff.Cmp(maxDiff) == 1 { reprocessNeeded = true } - log.Infof("calculated newBreakEvenGasPrice: %d, tx.BreakEvenGasprice: %d for tx: %s", newBreakEvenGasPrice, tx.BreakEvenGasPrice, tx.HashStr) + log.Infof("calculated newBreakEvenGasPrice: %d, tx.BreakEvenGasPrice: %d for tx: %s", newBreakEvenGasPrice, tx.BreakEvenGasPrice, tx.HashStr) log.Infof("Would need reprocess: %t, diff: %d, maxDiff: %d", reprocessNeeded, diff, maxDiff) } } - processedTransaction := transactionToStore{ + txToStore := transactionToStore{ txTracker: tx, response: result.Responses[0], batchResponse: result, @@ -672,19 +701,9 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx flushId: result.FlushID, } - f.pendingTransactionsToStoreMux.Lock() - f.pendingTransactionsToStoreWG.Add(1) - if result.FlushID > f.lastPendingFlushID { - f.lastPendingFlushID = result.FlushID - f.pendingFlushIDCond.Broadcast() - } - f.pendingTransactionsToStoreMux.Unlock() - select { - case f.pendingTransactionsToStore <- processedTransaction: - case <-ctx.Done(): - // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTransactionsToStoreWG.Done() - } + f.updateLastPendingFlushID(result.FlushID) + + f.addPendingTxToStore(ctx, txToStore) f.batch.countOfTxs++ @@ -709,7 +728,9 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat } } - processedTransaction := transactionToStore{ + oldStateRoot = txResp.StateRoot + + txToStore := transactionToStore{ txTracker: nil, response: txResp, batchResponse: result, @@ -721,21 +742,9 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat flushId: result.FlushID, } - f.pendingTransactionsToStoreMux.Lock() - f.pendingTransactionsToStoreWG.Add(1) - if result.FlushID > f.lastPendingFlushID { - f.lastPendingFlushID = result.FlushID - f.pendingFlushIDCond.Broadcast() - } - f.pendingTransactionsToStoreMux.Unlock() - oldStateRoot = txResp.StateRoot + f.updateLastPendingFlushID(result.FlushID) - select { - case f.pendingTransactionsToStore <- processedTransaction: - case <-ctx.Done(): - // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTransactionsToStoreWG.Done() - } + f.addPendingTxToStore(ctx, txToStore) } } diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 74751f453c..16c723a21f 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -2447,7 +2447,6 @@ func setupFinalizer(withWipBatch bool) *finalizer { maxBreakEvenGasPriceDeviationPercentage: big.NewInt(10), pendingTransactionsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), pendingTransactionsToStoreWG: new(sync.WaitGroup), - pendingTransactionsToStoreMux: new(sync.RWMutex), storedFlushID: 0, storedFlushIDCond: sync.NewCond(new(sync.Mutex)), proverID: "", diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 181326eaaf..c13d0f4f5c 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -90,6 +90,8 @@ type workerInterface interface { AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error) MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker DeleteTx(txHash common.Hash, from common.Address) + AddPendingTxToStore(txHash common.Hash, addr common.Address) + DeletePendingTxToStore(txHash common.Hash, addr common.Address) HandleL2Reorg(txHashes []common.Hash) NewTxTracker(tx types.Transaction, counters state.ZKCounters, ip string) (*TxTracker, error) } diff --git a/sequencer/mock_worker.go b/sequencer/mock_worker.go index 1262c09023..40b0553b53 100644 --- a/sequencer/mock_worker.go +++ b/sequencer/mock_worker.go @@ -20,6 +20,11 @@ type WorkerMock struct { mock.Mock } +// AddPendingTxToStore provides a mock function with given fields: txHash, addr +func (_m *WorkerMock) AddPendingTxToStore(txHash common.Hash, addr common.Address) { + _m.Called(txHash, addr) +} + // AddTxTracker provides a mock function with given fields: ctx, txTracker func (_m *WorkerMock) AddTxTracker(ctx context.Context, txTracker *TxTracker) (*TxTracker, error) { ret := _m.Called(ctx, txTracker) @@ -46,6 +51,11 @@ func (_m *WorkerMock) AddTxTracker(ctx context.Context, txTracker *TxTracker) (* return r0, r1 } +// DeletePendingTxToStore provides a mock function with given fields: txHash, addr +func (_m *WorkerMock) DeletePendingTxToStore(txHash common.Hash, addr common.Address) { + _m.Called(txHash, addr) +} + // DeleteTx provides a mock function with given fields: txHash, from func (_m *WorkerMock) DeleteTx(txHash common.Hash, from common.Address) { _m.Called(txHash, from) diff --git a/sequencer/worker.go b/sequencer/worker.go index cba6938f7c..11b3575ac0 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -47,7 +47,6 @@ func (w *Worker) NewTxTracker(tx types.Transaction, counters state.ZKCounters, i // AddTxTracker adds a new Tx to the Worker func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *TxTracker, dropReason error) { w.workerMutex.Lock() - defer w.workerMutex.Unlock() addr, found := w.pool[tx.FromStr] @@ -89,6 +88,7 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T newReadyTx, prevReadyTx, repTx, dropReason = addr.addTx(tx) if dropReason != nil { log.Infof("AddTx tx(%s) dropped from addrQueue(%s), reason: %s", tx.HashStr, tx.FromStr, dropReason.Error()) + w.workerMutex.Unlock() return repTx, dropReason } @@ -106,6 +106,7 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T log.Infof("AddTx replacedTx(%s) nonce(%d) cost(%s) has been replaced", repTx.HashStr, repTx.Nonce, repTx.Cost.String()) } + w.workerMutex.Unlock() return repTx, nil } @@ -136,7 +137,7 @@ func (w *Worker) UpdateAfterSingleSuccessfulTxExecution(from common.Address, tou w.workerMutex.Lock() defer w.workerMutex.Unlock() if len(touchedAddresses) == 0 { - log.Errorf("UpdateAfterSingleSuccessfulTxExecution touchedAddresses is nil or empty") + log.Warnf("UpdateAfterSingleSuccessfulTxExecution touchedAddresses is nil or empty") } txsToDelete := make([]*TxTracker, 0) touchedFrom, found := touchedAddresses[from] @@ -144,7 +145,7 @@ func (w *Worker) UpdateAfterSingleSuccessfulTxExecution(from common.Address, tou fromNonce, fromBalance := touchedFrom.Nonce, touchedFrom.Balance _, _, txsToDelete = w.applyAddressUpdate(from, fromNonce, fromBalance) } else { - log.Errorf("UpdateAfterSingleSuccessfulTxExecution from(%s) not found in touchedAddresses", from.String()) + log.Warnf("UpdateAfterSingleSuccessfulTxExecution from(%s) not found in touchedAddresses", from.String()) } for addr, addressInfo := range touchedAddresses { @@ -170,7 +171,7 @@ func (w *Worker) MoveTxToNotReady(txHash common.Hash, from common.Address, actua if addrQueue.readyTx != nil { readyHashStr = addrQueue.readyTx.HashStr } - log.Errorf("MoveTxToNotReady txHash(%s) is not the readyTx(%s)", txHash.String(), readyHashStr) + log.Warnf("MoveTxToNotReady txHash(%s) is not the readyTx(%s)", txHash.String(), readyHashStr) } } _, _, txsToDelete := w.applyAddressUpdate(from, actualNonce, actualBalance) @@ -191,7 +192,7 @@ func (w *Worker) DeleteTx(txHash common.Hash, addr common.Address) { w.efficiencyList.delete(deletedReadyTx) } } else { - log.Errorf("DeleteTx addrQueue(%s) not found", addr.String()) + log.Warnf("DeleteTx addrQueue(%s) not found", addr.String()) } } @@ -224,7 +225,35 @@ func (w *Worker) UpdateTx(txHash common.Hash, addr common.Address, counters stat w.efficiencyList.add(newReadyTx) } } else { - log.Errorf("UpdateTx addrQueue(%s) not found", addr.String()) + log.Warnf("UpdateTx addrQueue(%s) not found", addr.String()) + } +} + +// AddPendingTxToStore adds a tx to the addrQueue list of pending txs to store in the DB (trusted state) +func (w *Worker) AddPendingTxToStore(txHash common.Hash, addr common.Address) { + w.workerMutex.Lock() + defer w.workerMutex.Unlock() + + addrQueue, found := w.pool[addr.String()] + + if found { + addrQueue.addPendingTxToStore(txHash) + } else { + log.Warnf("AddPendingTxToStore addrQueue(%s) not found", addr.String()) + } +} + +// DeletePendingTxToStore delete a tx from the addrQueue list of pending txs to store in the DB (trusted state) +func (w *Worker) DeletePendingTxToStore(txHash common.Hash, addr common.Address) { + w.workerMutex.Lock() + defer w.workerMutex.Unlock() + + addrQueue, found := w.pool[addr.String()] + + if found { + addrQueue.deletePendingTxToStore(txHash) + } else { + log.Warnf("DeletePendingTxToStore addrQueue(%s) not found", addr.String()) } } From 079b8d406bef67ebc128da0b6672e5ea8d291c62 Mon Sep 17 00:00:00 2001 From: agnusmor Date: Wed, 9 Aug 2023 04:04:00 +0200 Subject: [PATCH 2/4] fix finalizer test --- sequencer/finalizer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 16c723a21f..69c6dd4bc0 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -294,6 +294,7 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) { //dbManagerMock.On("GetGasPrices", ctx).Return(pool.GasPrices{L1GasPrice: 0, L2GasPrice: 0}, nilErr).Once() workerMock.On("DeleteTx", txTracker.Hash, txTracker.From).Return().Once() workerMock.On("UpdateAfterSingleSuccessfulTxExecution", txTracker.From, tc.executorResponse.ReadWriteAddresses).Return([]*TxTracker{}).Once() + workerMock.On("AddPendingTxToStore", txTracker.Hash, txTracker.From).Return().Once() } if tc.expectedUpdateTxStatus != "" { dbManagerMock.On("UpdateTxStatus", ctx, txHash, tc.expectedUpdateTxStatus, false, mock.Anything).Return(nil).Once() From 56d917ec60b81ca04979f2bc1a16bfe68b0b524f Mon Sep 17 00:00:00 2001 From: agnusmor Date: Wed, 9 Aug 2023 04:54:02 +0200 Subject: [PATCH 3/4] fix olsStateRoot in handleForcedTxsProcessResp --- sequencer/finalizer.go | 4 ++-- sequencer/finalizer_test.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 7de5203ccd..008c733a8b 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -728,8 +728,6 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat } } - oldStateRoot = txResp.StateRoot - txToStore := transactionToStore{ txTracker: nil, response: txResp, @@ -742,6 +740,8 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat flushId: result.FlushID, } + oldStateRoot = txResp.StateRoot + f.updateLastPendingFlushID(result.FlushID) f.addPendingTxToStore(ctx, txToStore) diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 69c6dd4bc0..8883ec7ff7 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -1515,6 +1515,7 @@ func Test_processTransaction(t *testing.T) { } if tc.expectedErr == nil { workerMock.On("UpdateAfterSingleSuccessfulTxExecution", tc.tx.From, tc.expectedResponse.ReadWriteAddresses).Return([]*TxTracker{}).Once() + workerMock.On("AddPendingTxToStore", tc.tx.Hash, tc.tx.From).Return().Once() } if tc.expectedUpdateTxStatus != "" { From 7099d1643ade2ad730d5c836e8dc4440916b74a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Wed, 9 Aug 2023 09:10:11 +0200 Subject: [PATCH 4/4] Update sequencer/addrqueue.go Co-authored-by: Alonso Rodriguez --- sequencer/addrqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencer/addrqueue.go b/sequencer/addrqueue.go index daddc83e26..13336b765b 100644 --- a/sequencer/addrqueue.go +++ b/sequencer/addrqueue.go @@ -102,7 +102,7 @@ func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *Tx prevReadyTx = a.readyTx txs = append(txs, a.readyTx) a.readyTx = nil - log.Debugf("Deleting eadyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr) + log.Debugf("Deleting readyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr) } return txs, prevReadyTx