Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: defer txpool reorg until worker fetches txns for the next block #905

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ type TxPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

Expand Down Expand Up @@ -300,6 +301,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
Expand Down Expand Up @@ -1160,13 +1162,14 @@ func (pool *TxPool) scheduleReorgLoop() {
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reorgsPaused bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*txSortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
if curDone == nil && launchNextRun && !reorgsPaused {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)

Expand Down Expand Up @@ -1218,6 +1221,7 @@ func (pool *TxPool) scheduleReorgLoop() {
}
close(nextDone)
return
case reorgsPaused = <-pool.reorgPauseCh:
0xmountaintop marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1677,6 +1681,22 @@ func (pool *TxPool) demoteUnexecutables() {
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
jonastheis marked this conversation as resolved.
Show resolved Hide resolved
func (pool *TxPool) PauseReorgs() {
select {
case pool.reorgPauseCh <- true:
case <-pool.reorgShutdownCh:
}
}

// ResumeReorgs allows new reorg jobs to be started
func (pool *TxPool) ResumeReorgs() {
select {
case pool.reorgPauseCh <- false:
case <-pool.reorgShutdownCh:
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
7 changes: 7 additions & 0 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
}
collectL2Timer.UpdateSince(tidyPendingStart)

// Allow txpool to be reorged as we build current block
w.eth.TxPool().ResumeReorgs()

var nextL1MsgIndex uint64
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
nextL1MsgIndex = *dbIndex
Expand Down Expand Up @@ -718,6 +721,10 @@ func (w *worker) commit(res *pipeline.Result) error {
"accRows", res.Rows,
)

// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
w.eth.TxPool().PauseReorgs()

rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
// Commit block and state to database.
_, err = w.chain.WriteBlockWithState(block, res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.State, true)
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 5 // Minor version component of the current release
VersionPatch = 13 // Patch version component of the current release
VersionPatch = 14 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down