Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: avoid delete addrQueue if it has pending txs to store #2391

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions sequencer/addrqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@ import (

// addrQueue is a struct that stores the ready and notReady txs for a specific from address
type addrQueue struct {
from common.Address
fromStr string
currentNonce uint64
currentBalance *big.Int
readyTx *TxTracker
notReadyTxs map[uint64]*TxTracker
from common.Address
fromStr string
currentNonce uint64
currentBalance *big.Int
readyTx *TxTracker
notReadyTxs map[uint64]*TxTracker
pendingTxsToStore map[common.Hash]struct{}
}

// newAddrQueue creates and init a addrQueue
func newAddrQueue(addr common.Address, nonce uint64, balance *big.Int) *addrQueue {
return &addrQueue{
from: addr,
fromStr: addr.String(),
currentNonce: nonce,
currentBalance: balance,
readyTx: nil,
notReadyTxs: make(map[uint64]*TxTracker),
from: addr,
fromStr: addr.String(),
currentNonce: nonce,
currentBalance: balance,
readyTx: nil,
notReadyTxs: make(map[uint64]*TxTracker),
pendingTxsToStore: make(map[common.Hash]struct{}),
}
}

Expand Down Expand Up @@ -76,6 +78,11 @@ func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, replacedTx *T
}
}

// addPendingTxToStore adds a tx to the list of pending txs to store in the DB (trusted state)
func (a *addrQueue) addPendingTxToStore(txHash common.Hash) {
a.pendingTxsToStore[txHash] = struct{}{}
}

// ExpireTransactions removes the txs that have been in the queue for more than maxTime
func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *TxTracker) {
var (
Expand All @@ -95,15 +102,15 @@ func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *Tx
prevReadyTx = a.readyTx
txs = append(txs, a.readyTx)
a.readyTx = nil
log.Debugf("Deleting notReadyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr)
log.Debugf("Deleting readyTx %s from addrQueue %s", prevReadyTx.HashStr, a.fromStr)
}

return txs, prevReadyTx
}

// IsEmpty returns true if the addrQueue is empty
func (a *addrQueue) IsEmpty() bool {
return a.readyTx == nil && len(a.notReadyTxs) == 0
return a.readyTx == nil && len(a.notReadyTxs) == 0 && len(a.pendingTxsToStore) == 0
}

// deleteTx deletes the tx from the addrQueue
Expand All @@ -126,6 +133,15 @@ func (a *addrQueue) deleteTx(txHash common.Hash) (deletedReadyTx *TxTracker) {
}
}

// deletePendingTxToStore delete a tx from the list of pending txs to store in the DB (trusted state)
func (a *addrQueue) deletePendingTxToStore(txHash common.Hash) {
if _, found := a.pendingTxsToStore[txHash]; found {
delete(a.pendingTxsToStore, txHash)
} else {
log.Warnf("tx (%s) not found in pendingTxsToStore list", txHash.String())
}
}

// updateCurrentNonceBalance updates the nonce and balance of the addrQueue and updates the ready and notReady txs
func (a *addrQueue) updateCurrentNonceBalance(nonce *uint64, balance *big.Int) (newReadyTx, prevReadyTx *TxTracker, toDelete []*TxTracker) {
var oldReadyTx *TxTracker = nil
Expand Down
173 changes: 91 additions & 82 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ type finalizer struct {
maxBreakEvenGasPriceDeviationPercentage *big.Int
defaultMinGasPriceAllowed uint64
// Processed txs
pendingTransactionsToStore chan transactionToStore
pendingTransactionsToStoreWG *sync.WaitGroup
pendingTransactionsToStoreMux *sync.RWMutex
storedFlushID uint64
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
pendingTransactionsToStore chan transactionToStore
pendingTransactionsToStoreWG *sync.WaitGroup
storedFlushID uint64
storedFlushIDCond *sync.Cond //Condition to wait until storedFlushID has been updated
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
}

type transactionToStore struct {
Expand Down Expand Up @@ -141,7 +140,6 @@ func newFinalizer(
maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage),
pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTransactionsToStoreWG: new(sync.WaitGroup),
pendingTransactionsToStoreMux: &sync.RWMutex{},
storedFlushID: 0,
// Mutex is unlocked when the condition is broadcasted
storedFlushIDCond: sync.NewCond(&sync.Mutex{}),
Expand Down Expand Up @@ -184,10 +182,51 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s
f.finalizeBatches(ctx)
}

// storePendingTransactions stores the pending transactions in the database
func (f *finalizer) storePendingTransactions(ctx context.Context) {
for {
select {
case tx, ok := <-f.pendingTransactionsToStore:
if !ok {
// Channel is closed
return
}

// Wait until f.storedFlushID >= tx.flushId
f.storedFlushIDCond.L.Lock()
for f.storedFlushID < tx.flushId {
f.storedFlushIDCond.Wait()
// check if context is done after waking up
if ctx.Err() != nil {
f.storedFlushIDCond.L.Unlock()
return
}
}
f.storedFlushIDCond.L.Unlock()

// Now f.storedFlushID >= tx.flushId, we can store tx
f.storeProcessedTx(ctx, tx)

// Delete the txTracker from the pending list in the worker (addrQueue)
f.worker.DeletePendingTxToStore(tx.txTracker.Hash, tx.txTracker.From)

f.pendingTransactionsToStoreWG.Done()
case <-ctx.Done():
// The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit
f.pendingTransactionsToStoreWG.Wait()
return
default:
time.Sleep(100 * time.Millisecond) //nolint:gomnd
}
}
}

// updateProverIdAndFlushId updates the prover id and flush id
func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
for {
f.pendingFlushIDCond.L.Lock()
// f.storedFlushID is >= than f.lastPendingFlushID, this means all pending txs (flushid) are stored by the executor.
// We are "synced" with the flush id, therefore we need to wait for new tx (new pending flush id to be stored by the executor)
for f.storedFlushID >= f.lastPendingFlushID {
f.pendingFlushIDCond.Wait()
}
Expand All @@ -199,7 +238,14 @@ func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
log.Errorf("failed to get stored flush id, Err: %v", err)
} else {
if storedFlushID != f.storedFlushID {
f.checkProverIDAndUpdateStoredFlushID(storedFlushID, proverID)
// Check if prover/Executor has been restarted
f.checkIfProverRestarted(proverID)

// Update f.storeFlushID and signal condition f.storedFlushIDCond
f.storedFlushIDCond.L.Lock()
f.storedFlushID = storedFlushID
f.storedFlushIDCond.Broadcast()
f.storedFlushIDCond.L.Unlock()
}
}
}
Expand Down Expand Up @@ -241,12 +287,31 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {
}
}

// updateStoredFlushID updates the stored flush id
func (f *finalizer) updateStoredFlushID(newFlushID uint64) {
f.storedFlushIDCond.L.Lock()
f.storedFlushID = newFlushID
f.storedFlushIDCond.Broadcast()
f.storedFlushIDCond.L.Unlock()
// updateLastPendingFLushID updates f.lastPendingFLushID with newFlushID value (it it has changed) and sends
// the signal condition f.pendingFlushIDCond to notify other go funcs that the f.lastPendingFlushID value has changed
func (f *finalizer) updateLastPendingFlushID(newFlushID uint64) {
if newFlushID > f.lastPendingFlushID {
f.lastPendingFlushID = newFlushID
f.pendingFlushIDCond.Broadcast()
}
}

// addPendingTxToStore adds a pending tx that is ready to be stored in the state DB once its flushid has been stored by the executor
func (f *finalizer) addPendingTxToStore(ctx context.Context, txToStore transactionToStore) {
f.pendingTransactionsToStoreWG.Add(1)
if txToStore.txTracker != nil {
f.worker.AddPendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From)
}
select {
case f.pendingTransactionsToStore <- txToStore:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count and
// delete the pending TxToStore added in the worker
f.pendingTransactionsToStoreWG.Done()
if txToStore.txTracker != nil {
f.worker.DeletePendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From)
}
}
}

// finalizeBatches runs the endless loop for processing transactions finalizing batches.
Expand Down Expand Up @@ -370,7 +435,7 @@ func (f *finalizer) halt(ctx context.Context, err error) {
}

// checkProverIDAndUpdateStoredFlushID checks if the proverID changed and updates the stored flush id
func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, proverID string) {
func (f *finalizer) checkIfProverRestarted(proverID string) {
if f.proverID != "" && f.proverID != proverID {
event := &event.Event{
ReceivedAt: time.Now(),
Expand All @@ -388,42 +453,6 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr

log.Fatal("restarting sequencer to discard current WIP batch and work with new executor")
}
f.updateStoredFlushID(storedFlushID)
}

// storePendingTransactions stores the pending transactions in the database
func (f *finalizer) storePendingTransactions(ctx context.Context) {
for {
select {
case tx, ok := <-f.pendingTransactionsToStore:
if !ok {
// Channel is closed
return
}

// Print the formatted timestamp
f.storedFlushIDCond.L.Lock()
for f.storedFlushID < tx.flushId {
f.storedFlushIDCond.Wait()
// check if context is done after waking up
if ctx.Err() != nil {
f.storedFlushIDCond.L.Unlock()
return
}
}
f.storedFlushIDCond.L.Unlock()

// Now f.storedFlushID >= tx.flushId, you can store tx
f.storeProcessedTx(ctx, tx)
f.pendingTransactionsToStoreWG.Done()
case <-ctx.Done():
// The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit
f.pendingTransactionsToStoreWG.Wait()
return
default:
time.Sleep(100 * time.Millisecond) //nolint:gomnd
}
}
}

// newWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch
Expand Down Expand Up @@ -655,12 +684,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
if diff.Cmp(maxDiff) == 1 {
reprocessNeeded = true
}
log.Infof("calculated newBreakEvenGasPrice: %d, tx.BreakEvenGasprice: %d for tx: %s", newBreakEvenGasPrice, tx.BreakEvenGasPrice, tx.HashStr)
log.Infof("calculated newBreakEvenGasPrice: %d, tx.BreakEvenGasPrice: %d for tx: %s", newBreakEvenGasPrice, tx.BreakEvenGasPrice, tx.HashStr)
log.Infof("Would need reprocess: %t, diff: %d, maxDiff: %d", reprocessNeeded, diff, maxDiff)
}
}

processedTransaction := transactionToStore{
txToStore := transactionToStore{
txTracker: tx,
response: result.Responses[0],
batchResponse: result,
Expand All @@ -672,19 +701,9 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
flushId: result.FlushID,
}

f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
}
f.pendingTransactionsToStoreMux.Unlock()
select {
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTransactionsToStoreWG.Done()
}
f.updateLastPendingFlushID(result.FlushID)

f.addPendingTxToStore(ctx, txToStore)

f.batch.countOfTxs++

Expand All @@ -709,7 +728,7 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
}
}

processedTransaction := transactionToStore{
txToStore := transactionToStore{
txTracker: nil,
response: txResp,
batchResponse: result,
Expand All @@ -721,21 +740,11 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
flushId: result.FlushID,
}

f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
}
f.pendingTransactionsToStoreMux.Unlock()
oldStateRoot = txResp.StateRoot

select {
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTransactionsToStoreWG.Done()
}
f.updateLastPendingFlushID(result.FlushID)

f.addPendingTxToStore(ctx, txToStore)
}
}

Expand Down
3 changes: 2 additions & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) {
//dbManagerMock.On("GetGasPrices", ctx).Return(pool.GasPrices{L1GasPrice: 0, L2GasPrice: 0}, nilErr).Once()
workerMock.On("DeleteTx", txTracker.Hash, txTracker.From).Return().Once()
workerMock.On("UpdateAfterSingleSuccessfulTxExecution", txTracker.From, tc.executorResponse.ReadWriteAddresses).Return([]*TxTracker{}).Once()
workerMock.On("AddPendingTxToStore", txTracker.Hash, txTracker.From).Return().Once()
}
if tc.expectedUpdateTxStatus != "" {
dbManagerMock.On("UpdateTxStatus", ctx, txHash, tc.expectedUpdateTxStatus, false, mock.Anything).Return(nil).Once()
Expand Down Expand Up @@ -1514,6 +1515,7 @@ func Test_processTransaction(t *testing.T) {
}
if tc.expectedErr == nil {
workerMock.On("UpdateAfterSingleSuccessfulTxExecution", tc.tx.From, tc.expectedResponse.ReadWriteAddresses).Return([]*TxTracker{}).Once()
workerMock.On("AddPendingTxToStore", tc.tx.Hash, tc.tx.From).Return().Once()
}

if tc.expectedUpdateTxStatus != "" {
Expand Down Expand Up @@ -2447,7 +2449,6 @@ func setupFinalizer(withWipBatch bool) *finalizer {
maxBreakEvenGasPriceDeviationPercentage: big.NewInt(10),
pendingTransactionsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTransactionsToStoreWG: new(sync.WaitGroup),
pendingTransactionsToStoreMux: new(sync.RWMutex),
storedFlushID: 0,
storedFlushIDCond: sync.NewCond(new(sync.Mutex)),
proverID: "",
Expand Down
2 changes: 2 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type workerInterface interface {
AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error)
MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker
DeleteTx(txHash common.Hash, from common.Address)
AddPendingTxToStore(txHash common.Hash, addr common.Address)
DeletePendingTxToStore(txHash common.Hash, addr common.Address)
HandleL2Reorg(txHashes []common.Hash)
NewTxTracker(tx types.Transaction, counters state.ZKCounters, ip string) (*TxTracker, error)
}
Expand Down
Loading
Loading