Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Aug 7, 2023
1 parent 691b19f commit 6b720b4
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 139 deletions.
87 changes: 29 additions & 58 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ type finalizer struct {
maxBreakEvenGasPriceDeviationPercentage *big.Int
defaultMinGasPriceAllowed uint64
// Processed txs
pendingTxsToStore chan transactionToStore
pendingTxsToStoreWG *sync.WaitGroup
pendingTxsToStoreMux *sync.RWMutex
storedFlushID uint64
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
pendingTransactionsToStore chan transactionToStore
pendingTransactionsToStoreWG *sync.WaitGroup
pendingTransactionsToStoreMux *sync.RWMutex
storedFlushID uint64
storedFlushIDCond *sync.Cond
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
}

type transactionToStore struct {
Expand Down Expand Up @@ -139,9 +139,9 @@ func newFinalizer(
// event log
eventLog: eventLog,
maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage),
pendingTxsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTxsToStoreWG: new(sync.WaitGroup),
pendingTxsToStoreMux: &sync.RWMutex{},
pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier),
pendingTransactionsToStoreWG: new(sync.WaitGroup),
pendingTransactionsToStoreMux: &sync.RWMutex{},
storedFlushID: 0,
// Mutex is unlocked when the condition is broadcasted
storedFlushIDCond: sync.NewCond(&sync.Mutex{}),
Expand Down Expand Up @@ -187,12 +187,9 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, processingReq *s
// updateProverIdAndFlushId updates the prover id and flush id
func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
for {
log.Infof("checking for stored flush id to be less than last pending flush id ...")
f.pendingFlushIDCond.L.Lock()
for f.storedFlushID >= f.lastPendingFlushID {
log.Infof("waiting for new pending flush id, last pending flush id: %v", f.lastPendingFlushID)
f.pendingFlushIDCond.Wait()
log.Infof("received new last pending flush id: %v", f.lastPendingFlushID)
}
f.pendingFlushIDCond.L.Unlock()

Expand Down Expand Up @@ -246,7 +243,6 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {

// updateStoredFlushID updates the stored flush id
func (f *finalizer) updateStoredFlushID(newFlushID uint64) {
log.Infof("updating stored flush id to: %v", newFlushID)
f.storedFlushIDCond.L.Lock()
f.storedFlushID = newFlushID
f.storedFlushIDCond.Broadcast()
Expand Down Expand Up @@ -375,7 +371,6 @@ func (f *finalizer) halt(ctx context.Context, err error) {

// checkProverIDAndUpdateStoredFlushID checks if the proverID changed and updates the stored flush id
func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, proverID string) {
log.Infof("checking proverID: %s", proverID)
if f.proverID != "" && f.proverID != proverID {
event := &event.Event{
ReceivedAt: time.Now(),
Expand All @@ -400,22 +395,18 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr
func (f *finalizer) storePendingTransactions(ctx context.Context) {
for {
select {
case tx, ok := <-f.pendingTxsToStore:
case tx, ok := <-f.pendingTransactionsToStore:
if !ok {
log.Infof("pendingTxsToStore channel is closed")
// Channel is closed
return
}

log.Infof("storing pending transaction hash: %s", tx.txTracker.Hash.String())
// Print the formatted timestamp
f.storedFlushIDCond.L.Lock()
for f.storedFlushID < tx.flushId {
log.Infof("waiting for FlushID: %d to be stored (confirmed) ...", tx.flushId)
f.storedFlushIDCond.Wait()
log.Infof("waking up after FlushID: %d was stored (confirmed)", tx.flushId)
// check if context is done after waking up
if ctx.Err() != nil {
log.Errorf("context is done, err: %s", ctx.Err())
f.storedFlushIDCond.L.Unlock()
return
}
Expand All @@ -424,13 +415,10 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) {

// Now f.storedFlushID >= tx.flushId, you can store tx
f.storeProcessedTx(ctx, tx)

log.Infof("updating pending transaction trackers for transaction hash: %s, flush Id: %d ...", tx.txTracker.Hash.String(), tx.flushId)
f.pendingTxsToStoreMux.Lock()
f.pendingTxsToStoreWG.Done()
f.pendingTransactionsToStoreWG.Done()
case <-ctx.Done():
// The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit
f.pendingTxsToStoreWG.Wait()
f.pendingTransactionsToStoreWG.Wait()
return
default:
time.Sleep(100 * time.Millisecond) //nolint:gomnd
Expand All @@ -445,14 +433,10 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {

// Wait until all processed transactions are saved
startWait := time.Now()
batchNumber := uint64(0)
if f.batch != nil {
batchNumber = f.batch.batchNumber
}
log.Infof("waiting for pending transactions to be stored batch number: %d ...", batchNumber)
f.pendingTxsToStoreWG.Wait()
f.pendingTransactionsToStoreWG.Wait()
endWait := time.Now()
log.Infof("waiting for pending transactions for batch number: %d to be stored took: %s", batchNumber, endWait.Sub(startWait).String())

log.Info("waiting for pending transactions to be stored took: ", endWait.Sub(startWait).String())

var err error
if f.batch.stateRoot == state.ZeroHash {
Expand Down Expand Up @@ -688,21 +672,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
flushId: result.FlushID,
}

log.Infof("adding tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", result.Responses[0].TxHash, f.batch.batchNumber, result.FlushID)
f.pendingTxsToStoreMux.Lock()
f.pendingTxsToStoreWG.Add(1)
f.pendingTxsToStoreMux.Unlock()
f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
}

log.Infof("sending tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", result.Responses[0].TxHash, f.batch.batchNumber)
f.pendingTransactionsToStoreMux.Unlock()
select {
case f.pendingTxsToStore <- processedTransaction:
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTxsToStoreWG.Done()
f.pendingTransactionsToStoreWG.Done()
}

f.batch.countOfTxs++
Expand All @@ -728,16 +709,8 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
}
}

from, err := state.GetSender(txResp.Tx)
if err != nil {
log.Errorf("handleForcedTxsProcessResp: failed to get sender: %s", err)
continue
}

processedTransaction := transactionToStore{
txTracker: &TxTracker{
From: from,
},
txTracker: nil,
response: txResp,
batchResponse: result,
batchNumber: request.BatchNumber,
Expand All @@ -748,22 +721,20 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
flushId: result.FlushID,
}

log.Infof("adding forced tx to pendingTxsToStore. tx: %s, batchNumber: %d, flushId: %d", txResp.TxHash, request.BatchNumber, result.FlushID)
f.pendingTxsToStoreMux.Lock()
f.pendingTxsToStoreWG.Add(1)
f.pendingTxsToStoreMux.Unlock()
f.pendingTransactionsToStoreMux.Lock()
f.pendingTransactionsToStoreWG.Add(1)
if result.FlushID > f.lastPendingFlushID {
f.lastPendingFlushID = result.FlushID
f.pendingFlushIDCond.Broadcast()
}
f.pendingTransactionsToStoreMux.Unlock()
oldStateRoot = txResp.StateRoot

log.Infof("sending forced tx to pendingTxsToStore channel. tx: %s, batchNumber: %d", txResp.TxHash, request.BatchNumber)
select {
case f.pendingTxsToStore <- processedTransaction:
case f.pendingTransactionsToStore <- processedTransaction:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count
f.pendingTxsToStoreWG.Done()
f.pendingTransactionsToStoreWG.Done()
}
}
}
Expand Down
Loading

0 comments on commit 6b720b4

Please sign in to comment.