Skip to content

Commit

Permalink
bugfix: fixing finalizer's handling.
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Nedkov <[email protected]>
  • Loading branch information
Psykepro committed Aug 4, 2023
1 parent 8ec1474 commit 89c0f91
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
33 changes: 15 additions & 18 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type finalizer struct {
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
pendingFlushIDChan chan uint64
}

type transactionToStore struct {
Expand Down Expand Up @@ -151,7 +151,7 @@ func newFinalizer(
storedFlushIDCond: sync.NewCond(&sync.Mutex{}),
proverID: "",
lastPendingFlushID: 0,
pendingFlushIDCond: sync.NewCond(&sync.Mutex{}),
pendingFlushIDChan: make(chan uint64, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
}
}

Expand Down Expand Up @@ -192,16 +192,14 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s
func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
for {
log.Infof("checking for stored flush id to be less than last pending flush id ...")
f.pendingFlushIDCond.L.Lock()

for f.storedFlushID >= f.lastPendingFlushID {
log.Infof("waiting for stored flush id to be less than last pending flush id, stored flush id: %v, last pending flush id: %v", f.storedFlushID, f.lastPendingFlushID)
f.pendingFlushIDCond.Wait()
log.Infof("stored flush id: %v, last pending flush id: %v", f.storedFlushID, f.lastPendingFlushID)
log.Infof("waiting for new pending flush id, last pending flush id: %v", f.lastPendingFlushID)
<-f.pendingFlushIDChan
log.Infof("received new last pending flush id: %v", f.lastPendingFlushID)
}
f.pendingFlushIDCond.L.Unlock()

for f.storedFlushID < f.lastPendingFlushID {
log.Infof("stored flush id: %v, last pending flush id: %v", f.storedFlushID, f.lastPendingFlushID)
storedFlushID, proverID, err := f.dbManager.GetStoredFlushID(ctx)
if err != nil {
log.Errorf("failed to get stored flush id, Err: %v", err)
Expand Down Expand Up @@ -415,9 +413,9 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) {
// Print the formatted timestamp
f.storedFlushIDCond.L.Lock()
for f.storedFlushID < tx.flushId {
log.Infof("waiting for storedFlushID: %d, tx.flushId: %d", f.storedFlushID, tx.flushId)
log.Infof("waiting for FlushID: %d to be stored (confirmed) ...", tx.flushId)
f.storedFlushIDCond.Wait()
log.Infof("waking up for storedFlushID: %d, tx.flushId: %d", f.storedFlushID, tx.flushId)
log.Infof("waking up after FlushID: %d was stored (confirmed)", tx.flushId)
// check if context is done after waking up
if ctx.Err() != nil {
log.Errorf("context is done, err: %s", ctx.Err())
Expand All @@ -430,19 +428,18 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) {
// Now f.storedFlushID >= tx.flushId, you can store tx
f.storeProcessedTx(ctx, tx)

log.Infof("updating pending transaction trackers for transaction hash: %s ...", tx.txTracker.Hash.String())
log.Infof("updating pending transaction trackers for transaction hash: %s, flush Id: %d ...", tx.txTracker.Hash.String(), tx.flushId)
f.pendingTxsToStoreMux.Lock()
f.pendingTxsToStoreWG.Done()
f.pendingTxsPerAddressTrackers[tx.txTracker.From].wg.Done()
f.pendingTxsPerAddressTrackers[tx.txTracker.From].count--
log.Infof("updated pending transaction tracker for address: %s, count: %d, transaction hash: %s", tx.txTracker.From.String(), f.pendingTxsPerAddressTrackers[tx.txTracker.From].count, tx.txTracker.Hash.String())
log.Infof("updated pending transaction tracker for address: %s, count: %d, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), f.pendingTxsPerAddressTrackers[tx.txTracker.From].count, tx.txTracker.Hash.String(), tx.flushId)
// Needed to avoid memory leaks
if f.pendingTxsPerAddressTrackers[tx.txTracker.From].count == 0 {
log.Infof("deleting pending transaction tracker for address: %s, transaction hash: %s", tx.txTracker.From.String(), tx.txTracker.Hash.String())
log.Infof("deleting pending transaction tracker for address: %s, transaction hash: %s, flush Id: %d", tx.txTracker.From.String(), tx.txTracker.Hash.String(), tx.flushId)
delete(f.pendingTxsPerAddressTrackers, tx.txTracker.From)
}
f.pendingTxsToStoreMux.Unlock()

case <-ctx.Done():
// The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit
f.pendingTxsToStoreWG.Wait()
Expand Down Expand Up @@ -716,9 +713,9 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
f.pendingTxsPerAddressTrackers[processedTransaction.txTracker.From].count++
// broadcast the new flushID if it's greater than the last one
if result.FlushID > f.lastPendingFlushID {
log.Infof("broadcasting new flushId: %d", result.FlushID)
log.Infof("broadcasting new pending flushId: %d", result.FlushID)
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
f.pendingFlushIDChan <- result.FlushID
}
f.pendingTxsToStoreMux.Unlock()
log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber)
Expand Down Expand Up @@ -785,9 +782,9 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
f.pendingTxsPerAddressTrackers[from].count++
// broadcast the new flushID if it's greater than the last one
if result.FlushID > f.lastPendingFlushID {
log.Infof("broadcasting new flushId: %d", result.FlushID)
log.Infof("broadcasting new pending flushId: %d", result.FlushID)
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
f.pendingFlushIDChan <- result.FlushID
}
f.pendingTxsToStoreMux.Unlock()
oldStateRoot = txResp.StateRoot
Expand Down
2 changes: 1 addition & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,6 @@ func setupFinalizer(withWipBatch bool) *finalizer {
storedFlushIDCond: sync.NewCond(new(sync.Mutex)),
proverID: "",
lastPendingFlushID: 0,
pendingFlushIDCond: sync.NewCond(new(sync.Mutex)),
pendingFlushIDChan: make(chan uint64, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
}
}

0 comments on commit 89c0f91

Please sign in to comment.