Skip to content

Commit

Permalink
core/txpool: improve Add() logic, handle edge case (#2754)
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos authored Nov 22, 2024
1 parent ced7d9f commit af09f3a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 44 deletions.
36 changes: 8 additions & 28 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (pool *LegacyPool) Stats() (int, int) {
return pool.stats()
}

func (pool *LegacyPool) statsOverflowPool() int {
func (pool *LegacyPool) statsOverflowPool() uint64 {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand Down Expand Up @@ -907,25 +907,14 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

func (pool *LegacyPool) addToOverflowPool(drop types.Transactions, isLocal bool) {
// calculate total number of slots in drop. Accordingly add them to OverflowPool (if there is space)
availableSlotsOverflowPool := pool.availableSlotsOverflowPool()
if availableSlotsOverflowPool > 0 {
// transfer availableSlotsOverflowPool number of transactions slots from drop to OverflowPool
currentSlotsUsed := 0
for i, tx := range drop {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsOverflowPool {
from, _ := types.Sender(pool.signer, tx)
pool.localBufferPool.Add(tx)
log.Debug("adding to OverflowPool", "transaction", tx.Hash().String(), "from", from.String())
currentSlotsUsed += txSlots
} else {
log.Debug("not all got added to OverflowPool", "totalAdded", i+1)
return
}
for _, tx := range drop {
added := pool.localBufferPool.Add(tx)
if added {
from, _ := types.Sender(pool.signer, tx)
log.Debug("Added to OverflowPool", "transaction", tx.Hash().String(), "from", from.String())
} else {
log.Debug("Failed to add transaction to OverflowPool", "transaction", tx.Hash().String())
}
} else {
log.Debug("adding to OverflowPool unsuccessful", "availableSlotsOverflowPool", availableSlotsOverflowPool)
}
}

Expand Down Expand Up @@ -2108,15 +2097,6 @@ func (pool *LegacyPool) transferTransactions() {
pool.Add(txs, true, false)
}

func (pool *LegacyPool) availableSlotsOverflowPool() int {
maxOverflowPoolSize := int(pool.config.OverflowPoolSlots)
availableSlots := maxOverflowPoolSize - pool.localBufferPool.Size()
if availableSlots > 0 {
return availableSlots
}
return 0
}

func (pool *LegacyPool) PrintTxStats() {
for _, l := range pool.pending {
for _, transaction := range l.txs.items {
Expand Down
8 changes: 4 additions & 4 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,25 +2268,25 @@ func TestTransferTransactions(t *testing.T) {

assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")

tx2 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[1])
pool.addToOverflowPool([]*types.Transaction{tx2}, true)
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
pending, queue = pool.Stats()

assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 1, queue, "queued transactions mismatched")
assert.Equal(t, 0, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(0), pool.statsOverflowPool(), "OverflowPool size unexpected")

tx3 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[2])
pool.addToOverflowPool([]*types.Transaction{tx3}, true)
pending, queue = pool.Stats()

assert.Equal(t, 1, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected")
assert.Equal(t, uint64(1), pool.statsOverflowPool(), "OverflowPool size unexpected")
}

// Tests that the pool rejects replacement dynamic fee transactions that don't
Expand Down
43 changes: 31 additions & 12 deletions core/txpool/legacypool/tx_overflowpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

// txHeapItem implements the Interface interface (https://pkg.go.dev/container/heap#Interface) of heap so that it can be heapified
Expand Down Expand Up @@ -65,8 +66,8 @@ type TxOverflowPool struct {
txHeap txHeap
index map[common.Hash]*txHeapItem
mu sync.RWMutex
maxSize uint64
totalSize int
maxSize uint64 // Maximum slots
totalSize uint64 // Total number of slots currently
}

func NewTxOverflowPoolHeap(estimatedMaxSize uint64) *TxOverflowPool {
Expand All @@ -77,34 +78,52 @@ func NewTxOverflowPoolHeap(estimatedMaxSize uint64) *TxOverflowPool {
}
}

func (tp *TxOverflowPool) Add(tx *types.Transaction) {
func (tp *TxOverflowPool) Add(tx *types.Transaction) bool {
tp.mu.Lock()
defer tp.mu.Unlock()

if _, exists := tp.index[tx.Hash()]; exists {
// Transaction already in pool, ignore
return
return false
}

txSlots := uint64(numSlots(tx))

// If the transaction is too big to ever fit (and the pool isn't empty right now), reject it
if (txSlots > tp.maxSize) || (txSlots == tp.maxSize && tp.totalSize != 0) {
log.Warn("Transaction too large to fit in OverflowPool", "transaction", tx.Hash().String(), "requiredSlots", txSlots, "maxSlots", tp.maxSize)
return false
}

if uint64(len(tp.txHeap)) >= tp.maxSize {
// Remove the oldest transaction to make space
// Remove transactions until there is room for the new transaction
for tp.totalSize+txSlots > tp.maxSize {
if tp.txHeap.Len() == 0 {
// No transactions left to remove, cannot make room
log.Warn("Not enough space in OverflowPool even after clearing", "transaction", tx.Hash().String())
return false
}
// Remove the oldest transaction
oldestItem, ok := heap.Pop(&tp.txHeap).(*txHeapItem)
if !ok || oldestItem == nil {
return
log.Error("Failed to pop from txHeap during Add")
return false
}
delete(tp.index, oldestItem.tx.Hash())
tp.totalSize -= numSlots(oldestItem.tx)
tp.totalSize -= uint64(numSlots(oldestItem.tx))
OverflowPoolGauge.Dec(1)
}

// Add the new transaction
item := &txHeapItem{
tx: tx,
timestamp: time.Now().UnixNano(),
}
heap.Push(&tp.txHeap, item)
tp.index[tx.Hash()] = item
tp.totalSize += numSlots(tx)
tp.totalSize += txSlots
OverflowPoolGauge.Inc(1)

return true
}

func (tp *TxOverflowPool) Get(hash common.Hash) (*types.Transaction, bool) {
Expand All @@ -122,7 +141,7 @@ func (tp *TxOverflowPool) Remove(hash common.Hash) {
if item, ok := tp.index[hash]; ok {
heap.Remove(&tp.txHeap, item.index)
delete(tp.index, hash)
tp.totalSize -= numSlots(item.tx)
tp.totalSize -= uint64(numSlots(item.tx))
OverflowPoolGauge.Dec(1)
}
}
Expand All @@ -141,7 +160,7 @@ func (tp *TxOverflowPool) Flush(n int) []*types.Transaction {
}
txs[i] = item.tx
delete(tp.index, item.tx.Hash())
tp.totalSize -= numSlots(item.tx)
tp.totalSize -= uint64(numSlots(item.tx))
}

OverflowPoolGauge.Dec(int64(n))
Expand All @@ -154,7 +173,7 @@ func (tp *TxOverflowPool) Len() int {
return tp.txHeap.Len()
}

func (tp *TxOverflowPool) Size() int {
func (tp *TxOverflowPool) Size() uint64 {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.totalSize
Expand Down
77 changes: 77 additions & 0 deletions core/txpool/legacypool/tx_overflowpool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package legacypool

import (
rand3 "crypto/rand"
"math/big"
rand2 "math/rand"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/cometbft/cometbft/libs/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
)

// Helper function to create a test transaction
Expand Down Expand Up @@ -157,6 +159,59 @@ func TestTxOverflowPoolHeapLen(t *testing.T) {
}
}

func TestTxOverflowPoolSlotCalculation(t *testing.T) {
// Initialize the pool with a maximum size of 2
pool := NewTxOverflowPoolHeap(2)

// Create two transactions with different slot requirements
tx1 := createTestTx(1, big.NewInt(1000)) // tx1 takes 1 slot
tx2 := createTestTx(2, big.NewInt(2000)) // tx2 takes 1 slot

// Add both transactions to fill the pool
pool.Add(tx1)
pool.Add(tx2)

if pool.Len() != 2 {
t.Fatalf("Expected pool size 2, but got %d", pool.Len())
}

dataSize := 40000
tx3 := createLargeTestTx(
3, // nonce
big.NewInt(100000000000), // gasPrice: 100 Gwei
dataSize,
) // takes 2 slots

// Create a third transaction with more slots than tx1
tx3Added := pool.Add(tx3)
assert.Equal(t, false, tx3Added)
assert.Equal(t, uint64(2), pool.totalSize)

// Verify that the pool length remains at 2
assert.Equal(t, 2, pool.Len(), "Expected pool size 2 after overflow")

tx4 := createTestTx(4, big.NewInt(3000)) // tx4 takes 1 slot
// Add tx4 to the pool
assert.True(t, pool.Add(tx4), "Failed to add tx4")

// The pool should evict the oldest transaction (tx1) to make room for tx4
// Verify that tx1 is no longer in the pool
_, exists := pool.Get(tx1.Hash())
assert.False(t, exists, "Expected tx1 to be evicted from the pool")
}

func TestBiggerTx(t *testing.T) {
// Create a transaction with 40KB of data (which should take 2 slots)
dataSize := 40000
tx := createLargeTestTx(
0, // nonce
big.NewInt(100000000000), // gasPrice: 100 Gwei
dataSize,
)
numberOfSlots := numSlots(tx)
assert.Equal(t, 2, numberOfSlots)
}

// Helper function to create a random test transaction
func createRandomTestTx() *types.Transaction {
nonce := uint64(rand.Intn(1000000))
Expand All @@ -176,6 +231,28 @@ func createRandomTestTxs(n int) []*types.Transaction {
return txs
}

// createLargeTestTx creates a transaction with a large data payload
func createLargeTestTx(nonce uint64, gasPrice *big.Int, dataSize int) *types.Transaction {
// Generate random data of specified size
data := make([]byte, dataSize)
rand3.Read(data)

to := common.HexToAddress("0x1234567890123456789012345678901234567890")

// Calculate gas needed for the data
// Gas costs: 21000 (base) + 16 (per non-zero byte) or 4 (per zero byte)
gasLimit := uint64(21000 + (16 * len(data)))

return types.NewTransaction(
nonce,
to,
big.NewInt(1000),
gasLimit,
gasPrice,
data,
)
}

// goos: darwin
// goarch: arm64
// pkg: github.com/ethereum/go-ethereum/core/txpool/legacypool
Expand Down

0 comments on commit af09f3a

Please sign in to comment.