Skip to content

Commit

Permalink
feat: defer txpool reorg until worker fetches txns for the next block (
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak authored and 0xmountaintop committed Jul 31, 2024
1 parent e9fec8f commit af9fc00
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
24 changes: 23 additions & 1 deletion core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type LegacyPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

Expand Down Expand Up @@ -258,6 +259,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
Expand Down Expand Up @@ -1198,13 +1200,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reorgsPaused bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*sortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
if curDone == nil && launchNextRun && !reorgsPaused {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)

Expand Down Expand Up @@ -1256,6 +1259,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
close(nextDone)
return
case reorgsPaused = <-pool.reorgPauseCh:
}
}
}
Expand Down Expand Up @@ -1705,6 +1709,24 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *TxPool) PauseReorgs() {
select {
case pool.reorgPauseCh <- true:
case <-pool.reorgShutdownCh:
}
}

// ResumeReorgs allows new reorg jobs to be started.
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *TxPool) ResumeReorgs() {
select {
case pool.reorgPauseCh <- false:
case <-pool.reorgShutdownCh:
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
7 changes: 7 additions & 0 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
}
collectL2Timer.UpdateSince(tidyPendingStart)

// Allow txpool to be reorged as we build current block
w.eth.TxPool().ResumeReorgs()

var nextL1MsgIndex uint64
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
nextL1MsgIndex = *dbIndex
Expand Down Expand Up @@ -668,6 +671,10 @@ func (w *worker) commit(res *pipeline.Result) error {
"accRows", res.Rows,
)

// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
w.eth.TxPool().PauseReorgs()

rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
// Commit block and state to database.
_, err = w.chain.WriteBlockAndSetHead(block, res.FinalBlock.Receipts, logs, res.FinalBlock.State, true)
Expand Down

0 comments on commit af9fc00

Please sign in to comment.