Skip to content

Commit

Permalink
feat: defer txpool reorg until worker fetches txns for the next block (
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak authored and achmand committed Aug 13, 2024
1 parent cf79a9e commit c5d7bf4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
24 changes: 23 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ type TxPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

Expand Down Expand Up @@ -300,6 +301,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
Expand Down Expand Up @@ -1160,13 +1162,14 @@ func (pool *TxPool) scheduleReorgLoop() {
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reorgsPaused bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*txSortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
if curDone == nil && launchNextRun && !reorgsPaused {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)

Expand Down Expand Up @@ -1218,6 +1221,7 @@ func (pool *TxPool) scheduleReorgLoop() {
}
close(nextDone)
return
case reorgsPaused = <-pool.reorgPauseCh:
}
}
}
Expand Down Expand Up @@ -1677,6 +1681,24 @@ func (pool *TxPool) demoteUnexecutables() {
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *TxPool) PauseReorgs() {
select {
case pool.reorgPauseCh <- true:
case <-pool.reorgShutdownCh:
}
}

// ResumeReorgs allows new reorg jobs to be started.
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *TxPool) ResumeReorgs() {
select {
case pool.reorgPauseCh <- false:
case <-pool.reorgShutdownCh:
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
7 changes: 7 additions & 0 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
}
collectL2Timer.UpdateSince(tidyPendingStart)

// Allow txpool to be reorged as we build current block
w.eth.TxPool().ResumeReorgs()

var nextL1MsgIndex uint64
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
nextL1MsgIndex = *dbIndex
Expand Down Expand Up @@ -719,6 +722,10 @@ func (w *worker) commit(res *pipeline.Result) error {
"accRows", res.Rows,
)

// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
w.eth.TxPool().PauseReorgs()

rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
// Commit block and state to database.
_, err = w.chain.WriteBlockWithState(block, res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.State, true)
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 5 // Minor version component of the current release
VersionPatch = 17 // Patch version component of the current release
VersionPatch = 18 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down

0 comments on commit c5d7bf4

Please sign in to comment.