From af9fc001902380be68330a86efd41731fb2fb6a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Fri, 26 Jul 2024 10:25:58 +0300 Subject: [PATCH] feat: defer txpool reorg until worker fetches txns for the next block (#905) --- core/txpool/legacypool/legacypool.go | 24 +++++++++++++++++++++++- miner/scroll_worker.go | 7 +++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 97f820ed75e48..289dc200744f4 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -227,6 +227,7 @@ type LegacyPool struct { queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + reorgPauseCh chan bool // requests to pause scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop initDoneCh chan struct{} // is closed once the pool is initialized (for tests) @@ -258,6 +259,7 @@ func New(config Config, chain BlockChain) *LegacyPool { queueTxEventCh: make(chan *types.Transaction), reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), + reorgPauseCh: make(chan bool), initDoneCh: make(chan struct{}), } pool.locals = newAccountSet(pool.signer) @@ -1198,13 +1200,14 @@ func (pool *LegacyPool) scheduleReorgLoop() { curDone chan struct{} // non-nil while runReorg is active nextDone = make(chan struct{}) launchNextRun bool + reorgsPaused bool reset *txpoolResetRequest dirtyAccounts *accountSet queuedEvents = make(map[common.Address]*sortedMap) ) for { // Launch next background reorg if needed - if curDone == nil && launchNextRun { + if curDone == nil && launchNextRun && !reorgsPaused { // Run the background reorg and announcements go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) @@ -1256,6 +1259,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { } close(nextDone) return + case reorgsPaused = <-pool.reorgPauseCh: } } } @@ -1705,6 +1709,24 @@ func (pool *LegacyPool) demoteUnexecutables() { } } +// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight +// Keep in mind this function might block, although it is not expected to block for any significant amount of time +func (pool *TxPool) PauseReorgs() { + select { + case pool.reorgPauseCh <- true: + case <-pool.reorgShutdownCh: + } +} + +// ResumeReorgs allows new reorg jobs to be started. +// Keep in mind this function might block, although it is not expected to block for any significant amount of time +func (pool *TxPool) ResumeReorgs() { + select { + case pool.reorgPauseCh <- false: + case <-pool.reorgShutdownCh: + } +} + // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.Address diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index c7e6249ccdf47..9eac3a0167236 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -400,6 +400,9 @@ func (w *worker) startNewPipeline(timestamp int64) { } collectL2Timer.UpdateSince(tidyPendingStart) + // Allow txpool to be reorged as we build current block + w.eth.TxPool().ResumeReorgs() + var nextL1MsgIndex uint64 if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil { nextL1MsgIndex = *dbIndex @@ -668,6 +671,10 @@ func (w *worker) commit(res *pipeline.Result) error { "accRows", res.Rows, ) + // A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block. + // We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check + w.eth.TxPool().PauseReorgs() + rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows) // Commit block and state to database. _, err = w.chain.WriteBlockAndSetHead(block, res.FinalBlock.Receipts, logs, res.FinalBlock.State, true)