-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
txpool: buffer so that we dont delete txs #2500
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -55,6 +55,8 @@ const ( | |||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// txReannoMaxNum is the maximum number of transactions a reannounce action can include. | ||||||||||||||||||||||||||||||||||||||||||||||||
txReannoMaxNum = 1024 | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
maxBufferSize = 1000 // maximum size of tx buffer | ||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
var ( | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -244,6 +246,10 @@ type LegacyPool struct { | |||||||||||||||||||||||||||||||||||||||||||||||
initDoneCh chan struct{} // is closed once the pool is initialized (for tests) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
changesSinceReorg int // A counter for how many drops we've performed in-between reorg. | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// A buffer to store transactions that would otherwise be discarded | ||||||||||||||||||||||||||||||||||||||||||||||||
buffer []*types.Transaction | ||||||||||||||||||||||||||||||||||||||||||||||||
bufferLock sync.Mutex | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
type txpoolResetRequest struct { | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -355,11 +361,13 @@ func (pool *LegacyPool) loop() { | |||||||||||||||||||||||||||||||||||||||||||||||
evict = time.NewTicker(evictionInterval) | ||||||||||||||||||||||||||||||||||||||||||||||||
reannounce = time.NewTicker(reannounceInterval) | ||||||||||||||||||||||||||||||||||||||||||||||||
journal = time.NewTicker(pool.config.Rejournal) | ||||||||||||||||||||||||||||||||||||||||||||||||
readd = time.NewTicker(time.Minute) // ticker to re-add buffered transactions periodically | ||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||
defer report.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||
defer evict.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||
defer reannounce.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||
defer journal.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||
defer readd.Stop() // Stop the ticker when the loop exits | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Notify tests that the init phase is done | ||||||||||||||||||||||||||||||||||||||||||||||||
close(pool.initDoneCh) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -436,6 +444,9 @@ func (pool *LegacyPool) loop() { | |||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
// Handle re-adding buffered transactions | ||||||||||||||||||||||||||||||||||||||||||||||||
case <-readd.C: | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.readdBufferedTransactions() | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -781,12 +792,21 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e | |||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
// If the transaction pool is full, discard underpriced transactions | ||||||||||||||||||||||||||||||||||||||||||||||||
// If the transaction pool is full, buffer underpriced transactions | ||||||||||||||||||||||||||||||||||||||||||||||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { | ||||||||||||||||||||||||||||||||||||||||||||||||
// If the new transaction is underpriced, don't accept it | ||||||||||||||||||||||||||||||||||||||||||||||||
// If the new transaction is underpriced, buffer it | ||||||||||||||||||||||||||||||||||||||||||||||||
if !isLocal && pool.priced.Underpriced(tx) { | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Trace("Buffering underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) | ||||||||||||||||||||||||||||||||||||||||||||||||
underpricedTxMeter.Mark(1) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
pool.bufferLock.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||
if len(pool.buffer) < maxBufferSize { | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.buffer = append(pool.buffer, tx) | ||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Warn("Buffer is full, discarding transaction", "hash", hash) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.bufferLock.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
return false, txpool.ErrUnderpriced | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -804,6 +824,20 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e | |||||||||||||||||||||||||||||||||||||||||||||||
// Otherwise if we can't make enough room for new one, abort the operation. | ||||||||||||||||||||||||||||||||||||||||||||||||
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Add dropped transactions to the buffer | ||||||||||||||||||||||||||||||||||||||||||||||||
for _, dropTx := range drop { | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Trace("Buffering discarded transaction", "hash", dropTx.Hash(), "gasTipCap", dropTx.GasTipCap(), "gasFeeCap", dropTx.GasFeeCap()) | ||||||||||||||||||||||||||||||||||||||||||||||||
underpricedTxMeter.Mark(1) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
pool.bufferLock.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||
if len(pool.buffer) < maxBufferSize { | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.buffer = append(pool.buffer, dropTx) | ||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Warn("Buffer is full, discarding transaction", "hash", hash) | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why you even continuing with the loop if it's full? |
||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.bufferLock.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Special case, we still can't make the room for the new remote one. | ||||||||||||||||||||||||||||||||||||||||||||||||
if !isLocal && !success { | ||||||||||||||||||||||||||||||||||||||||||||||||
log.Trace("Discarding overflown transaction", "hash", hash) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1779,6 +1813,48 @@ func (pool *LegacyPool) SetMaxGas(maxGas uint64) { | |||||||||||||||||||||||||||||||||||||||||||||||
pool.maxGas.Store(maxGas) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
func (pool *LegacyPool) readdBufferedTransactions() { | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||
defer pool.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Check if there is space in the pool | ||||||||||||||||||||||||||||||||||||||||||||||||
if uint64(pool.all.Slots()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { | ||||||||||||||||||||||||||||||||||||||||||||||||
return // No space available, skip re-adding | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
var readded []*types.Transaction | ||||||||||||||||||||||||||||||||||||||||||||||||
for _, tx := range pool.buffer { | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you do not lock buffermutex here and you should |
||||||||||||||||||||||||||||||||||||||||||||||||
// Check if adding this transaction will exceed the pool capacity | ||||||||||||||||||||||||||||||||||||||||||||||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { | ||||||||||||||||||||||||||||||||||||||||||||||||
break // Stop if adding the transaction will exceed the pool capacity | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
if _, err := pool.add(tx, false); err == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||
readded = append(readded, tx) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Remove successfully re-added transactions from the buffer | ||||||||||||||||||||||||||||||||||||||||||||||||
if len(readded) > 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||
remaining := pool.buffer[:0] | ||||||||||||||||||||||||||||||||||||||||||||||||
for _, tx := range pool.buffer { | ||||||||||||||||||||||||||||||||||||||||||||||||
if !containsTransaction(readded, tx) { | ||||||||||||||||||||||||||||||||||||||||||||||||
remaining = append(remaining, tx) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
pool.buffer = remaining | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
func containsTransaction(txs []*types.Transaction, tx *types.Transaction) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||
for _, t := range txs { | ||||||||||||||||||||||||||||||||||||||||||||||||
if t.Hash() == tx.Hash() { | ||||||||||||||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
return false | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// addressByHeartbeat is an account address tagged with its last activity timestamp. | ||||||||||||||||||||||||||||||||||||||||||||||||
type addressByHeartbeat struct { | ||||||||||||||||||||||||||||||||||||||||||||||||
address common.Address | ||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this metric shouldn't be triggered