Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor nonce calculation for addQueue #2382

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 35 additions & 95 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ type finalizer struct {
maxBreakEvenGasPriceDeviationPercentage *big.Int
defaultMinGasPriceAllowed uint64
// Processed txs
pendingTxsToStore chan transactionToStore
pendingTxsToStoreWG *sync.WaitGroup
pendingTxsToStoreMux *sync.RWMutex
pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker
storedFlushID uint64
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDChan chan uint64
pendingTransactionsToStore chan transactionToStore
pendingTransactionsToStoreWG *sync.WaitGroup
pendingTransactionsToStoreMux *sync.RWMutex
storedFlushID uint64
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
}

type transactionToStore struct {
Expand Down Expand Up @@ -114,8 +113,6 @@ func newFinalizer(
closingSignalCh ClosingSignalCh,
batchConstraints batchConstraints,
eventLog *event.EventLog,
pendingTxsToStoreMux *sync.RWMutex,
pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker,
) *finalizer {
return &finalizer{
cfg: cfg,
Expand All @@ -142,16 +139,15 @@ func newFinalizer(
// event log
eventLog: eventLog,
maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage),
pendingTxsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTxsToStoreWG: new(sync.WaitGroup),
pendingTxsToStoreMux: pendingTxsToStoreMux,
pendingTxsPerAddressTrackers: pendingTxsPerAddressTrackers,
pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTransactionsToStoreWG: new(sync.WaitGroup),
pendingTransactionsToStoreMux: &sync.RWMutex{},
storedFlushID: 0,
// Mutex is unlocked when the condition is broadcasted
storedFlushIDCond: sync.NewCond(&sync.Mutex{}),
proverID: "",
lastPendingFlushID: 0,
pendingFlushIDChan: make(chan uint64, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingFlushIDCond: sync.NewCond(&sync.Mutex{}),
}
}

Expand Down Expand Up @@ -191,13 +187,11 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s
// updateProverIdAndFlushId updates the prover id and flush id
func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
for {
log.Infof("checking for stored flush id to be less than last pending flush id ...")

f.pendingFlushIDCond.L.Lock()
for f.storedFlushID >= f.lastPendingFlushID {
log.Infof("waiting for new pending flush id, last pending flush id: %v", f.lastPendingFlushID)
<-f.pendingFlushIDChan
log.Infof("received new last pending flush id: %v", f.lastPendingFlushID)
f.pendingFlushIDCond.Wait()
}
f.pendingFlushIDCond.L.Unlock()

for f.storedFlushID < f.lastPendingFlushID {
storedFlushID, proverID, err := f.dbManager.GetStoredFlushID(ctx)
Expand Down Expand Up @@ -249,7 +243,6 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {

// updateStoredFlushID updates the stored flush id
func (f *finalizer) updateStoredFlushID(newFlushID uint64) {
log.Infof("updating stored flush id to: %v", newFlushID)
f.storedFlushIDCond.L.Lock()
f.storedFlushID = newFlushID
f.storedFlushIDCond.Broadcast()
Expand Down Expand Up @@ -378,7 +371,6 @@ func (f *finalizer) halt(ctx context.Context, err error) {

// checkProverIDAndUpdateStoredFlushID checks if the proverID changed and updates the stored flush id
func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, proverID string) {
log.Infof("checking proverID: %s", proverID)
if f.proverID != "" && f.proverID != proverID {
event := &event.Event{
ReceivedAt: time.Now(),
Expand All @@ -403,22 +395,18 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr
func (f *finalizer) storePendingTransactions(ctx context.Context) {
for {
select {
case tx, ok := <-f.pendingTxsToStore:
case tx, ok := <-f.pendingTransactionsToStore:
if !ok {
log.Infof("pendingTxsToStore channel is closed")
// Channel is closed
return
}

log.Infof("storing pending transaction hash: %s", tx.txTracker.Hash.String())
// Print the formatted timestamp
f.storedFlushIDCond.L.Lock()
for f.storedFlushID < tx.flushId {
log.Infof("waiting for FlushID: %d to be stored (confirmed) ...", tx.flushId)
f.storedFlushIDCond.Wait()
log.Infof("waking up after FlushID: %d was stored (confirmed)", tx.flushId)
// check if context is done after waking up
if ctx.Err() != nil {
log.Errorf("context is done, err: %s", ctx.Err())
f.storedFlushIDCond.L.Unlock()
return
}
Expand All @@ -427,22 +415,10 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) {

// Now f.storedFlushID >= tx.flushId, you can store tx
f.storeProcessedTx(ctx, tx)

log.Infof("updating pending transaction trackers for transaction hash: %s, flush Id: %d ...", tx.txTracker.Hash.String(), tx.flushId)
f.pendingTxsToStoreMux.Lock()
f.pendingTxsToStoreWG.Done()
f.pendingTxsPerAddressTrackers[tx.txTracker.From].wg.Done()
f.pendingTxsPerAddressTrackers[tx.txTracker.From].count--
log.Infof("updated pending transaction tracker for address: %s, count: %d, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), f.pendingTxsPerAddressTrackers[tx.txTracker.From].count, tx.txTracker.Hash.String(), tx.flushId)
// Needed to avoid memory leaks
if f.pendingTxsPerAddressTrackers[tx.txTracker.From].count == 0 {
log.Infof("deleting pending transaction tracker for address: %s, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), tx.txTracker.Hash.String(), tx.flushId)
delete(f.pendingTxsPerAddressTrackers, tx.txTracker.From)
}
f.pendingTxsToStoreMux.Unlock()
f.pendingTransactionsToStoreWG.Done()
case <-ctx.Done():
// The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit
f.pendingTxsToStoreWG.Wait()
f.pendingTransactionsToStoreWG.Wait()
return
default:
time.Sleep(100 * time.Millisecond) //nolint:gomnd
Expand All @@ -457,14 +433,10 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {

// Wait until all processed transactions are saved
startWait := time.Now()
batchNumber := uint64(0)
if f.batch != nil {
batchNumber = f.batch.batchNumber
}
log.Infof("waiting for pending transactions to be stored batch number: %d ...", batchNumber)
f.pendingTxsToStoreWG.Wait()
f.pendingTransactionsToStoreWG.Wait()
endWait := time.Now()
log.Infof("waiting for pending transactions for batch number: %d to be stored took: %s", batchNumber, endWait.Sub(startWait).String())

log.Info("waiting for pending transactions to be stored took: ", endWait.Sub(startWait).String())

var err error
if f.batch.stateRoot == state.ZeroHash {
Expand Down Expand Up @@ -700,30 +672,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
flushId: result.FlushID,
}

log.Infof("adding tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", result.Responses[0].TxHash, f.batch.batchNumber, result.FlushID)
f.pendingTxsToStoreMux.Lock()
// global tracker
f.pendingTxsToStoreWG.Add(1)
// per address tracker
if _, ok := f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From]; !ok {
f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From] = new(pendingTxPerAddressTracker)
f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].wg = &sync.WaitGroup{}
}
f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].wg.Add(1)
f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].count++
// broadcast the new flushID if it's greater than the last one
f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
log.Infof("broadcasting new pending flushId: %d", result.FlushID)
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDChan <- result.FlushID
f.pendingFlushIDCond.Broadcast()
}
f.pendingTxsToStoreMux.Unlock()
log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber)
f.pendingTransactionsToStoreMux.Unlock()
select {
case f.pendingTxsToStore <- processedTransaction:
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTxsToStoreWG.Done()
f.pendingTransactionsToStoreWG.Done()
}

f.batch.countOfTxs++
Expand All @@ -749,16 +709,8 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
}
}

from, err := state.GetSender(txResp.Tx)
if err != nil {
log.Errorf("handleForcedTxsProcessResp: failed to get sender: %s", err)
continue
}

processedTransaction := transactionToStore{
txTracker: &TxTracker{
From: from,
},
txTracker: nil,
response: txResp,
batchResponse: result,
batchNumber: request.BatchNumber,
Expand All @@ -769,32 +721,20 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
flushId: result.FlushID,
}

log.Infof("adding forced tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", txResp.TxHash, request.BatchNumber, result.FlushID)
f.pendingTxsToStoreMux.Lock()
// global tracker
f.pendingTxsToStoreWG.Add(1)
// per address tracker
if _, ok := f.pendingTxsPerAddressTrackers[from]; !ok {
f.pendingTxsPerAddressTrackers[from] = new(pendingTxPerAddressTracker)
f.pendingTxsPerAddressTrackers[from].wg = &sync.WaitGroup{}
}
f.pendingTxsPerAddressTrackers[from].wg.Add(1)
f.pendingTxsPerAddressTrackers[from].count++
// broadcast the new flushID if it's greater than the last one
f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
log.Infof("broadcasting new pending flushId: %d", result.FlushID)
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDChan <- result.FlushID
f.pendingFlushIDCond.Broadcast()
}
f.pendingTxsToStoreMux.Unlock()
f.pendingTransactionsToStoreMux.Unlock()
oldStateRoot = txResp.StateRoot

log.Infof("sending forced tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", txResp.TxHash, request.BatchNumber)
select {
case f.pendingTxsToStore <- processedTransaction:
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTxsToStoreWG.Done()
f.pendingTransactionsToStoreWG.Done()
}
}
}
Expand Down
Loading
Loading