Skip to content

Commit

Permalink
core/txpool: use atomic int added in go1.19 (#26913)
Browse files Browse the repository at this point in the history
Makes use of atomic.Uint64 instead of atomic by pointer
  • Loading branch information
s7v7nislands authored Mar 20, 2023
1 parent 81b0aa0 commit 80ff0b4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 37 deletions.
15 changes: 6 additions & 9 deletions core/txpool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,7 @@ func (h *priceHeap) Pop() interface{} {
// the floating heap is better. When baseFee is decreasing they behave similarly.
type pricedList struct {
// Number of stale price points to (re-heap trigger).
// This field is accessed atomically, and must be the first field
// to ensure it has correct alignment for atomic.AddInt64.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
stales int64
stales atomic.Int64

all *lookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
Expand Down Expand Up @@ -545,7 +542,7 @@ func (l *pricedList) Put(tx *types.Transaction, local bool) {
// the heap if a large enough ratio of transactions go stale.
func (l *pricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
stales := atomic.AddInt64(&l.stales, int64(count))
stales := l.stales.Add(int64(count))
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
return
}
Expand All @@ -570,7 +567,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
heap.Pop(h)
continue
}
Expand All @@ -597,7 +594,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
continue
}
// Non stale transaction found, move to floating heap
Expand All @@ -610,7 +607,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
continue
}
// Non stale transaction found, discard it
Expand All @@ -633,7 +630,7 @@ func (l *pricedList) Reheap() {
l.reheapMu.Lock()
defer l.reheapMu.Unlock()
start := time.Now()
atomic.StoreInt64(&l.stales, 0)
l.stales.Store(0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.urgent.list = append(l.urgent.list, tx)
Expand Down
3 changes: 1 addition & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -383,7 +382,7 @@ func (pool *TxPool) loop() {
pool.mu.RLock()
pending, queued := pool.stats()
pool.mu.RUnlock()
stales := int(atomic.LoadInt64(&pool.priced.stales))
stales := int(pool.priced.stales.Load())

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down
6 changes: 3 additions & 3 deletions core/txpool/txpool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestTransactionFutureAttack(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestTransactionFuture1559(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()

Expand Down Expand Up @@ -147,7 +147,7 @@ func TestTransactionZAttack(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
// Create a number of test accounts, fund them and make transactions
Expand Down
52 changes: 29 additions & 23 deletions core/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ func init() {
}

type testBlockChain struct {
gasLimit uint64 // must be first field for 64 bit alignment (atomic access)
gasLimit atomic.Uint64
statedb *state.StateDB
chainHeadFeed *event.Feed
}

func newTestBlockChain(gasLimit uint64, statedb *state.StateDB, chainHeadFeed *event.Feed) *testBlockChain {
bc := testBlockChain{statedb: statedb, chainHeadFeed: new(event.Feed)}
bc.gasLimit.Store(gasLimit)
return &bc
}

func (bc *testBlockChain) CurrentBlock() *types.Header {
return &types.Header{
Number: new(big.Int),
GasLimit: atomic.LoadUint64(&bc.gasLimit),
GasLimit: bc.gasLimit.Load(),
}
}

Expand Down Expand Up @@ -121,7 +127,7 @@ func setupPool() (*TxPool, *ecdsa.PrivateKey) {

func setupPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateKey) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{10000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(10000000, statedb, new(event.Feed))

key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, config, blockchain)
Expand Down Expand Up @@ -236,7 +242,7 @@ func TestStateChangeDuringReset(t *testing.T) {

// setup pool with 2 transaction in it
statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
blockchain := &testChain{&testBlockChain{1000000000, statedb, new(event.Feed)}, address, &trigger}
blockchain := &testChain{newTestBlockChain(1000000000, statedb, new(event.Feed)), address, &trigger}

tx0 := transaction(0, 100000, key)
tx1 := transaction(1, 100000, key)
Expand Down Expand Up @@ -430,7 +436,7 @@ func TestChainFork(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)}
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()
Expand Down Expand Up @@ -459,7 +465,7 @@ func TestDoubleNonce(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)}
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()
Expand Down Expand Up @@ -629,7 +635,7 @@ func TestDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
}
// Reduce the block gas limit, check that invalidated transactions are dropped
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
pool.chain.(*testBlockChain).gasLimit.Store(100)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
Expand Down Expand Up @@ -657,7 +663,7 @@ func TestPostponing(t *testing.T) {

// Create the pool to test the postponing with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -869,7 +875,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.NoLocals = nolocals
Expand Down Expand Up @@ -961,7 +967,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {

// Create the pool to test the non-expiration enforcement
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.Lifetime = time.Second
Expand Down Expand Up @@ -1146,7 +1152,7 @@ func TestPendingGlobalLimiting(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
Expand Down Expand Up @@ -1248,7 +1254,7 @@ func TestCapClearsFromAll(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.AccountSlots = 2
Expand Down Expand Up @@ -1282,7 +1288,7 @@ func TestPendingMinimumAllowance(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 1
Expand Down Expand Up @@ -1330,7 +1336,7 @@ func TestRepricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -1578,7 +1584,7 @@ func TestRepricingKeepsLocals(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -1651,7 +1657,7 @@ func TestUnderpricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 2
Expand Down Expand Up @@ -1765,7 +1771,7 @@ func TestStableUnderpricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 128
Expand Down Expand Up @@ -1997,7 +2003,7 @@ func TestDeduplication(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -2063,7 +2069,7 @@ func TestReplacement(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -2268,7 +2274,7 @@ func testJournaling(t *testing.T, nolocals bool) {

// Create the original pool to inject transaction into the journal
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.NoLocals = nolocals
Expand Down Expand Up @@ -2310,7 +2316,7 @@ func testJournaling(t *testing.T, nolocals bool) {
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))

pool = NewTxPool(config, params.TestChainConfig, blockchain)

Expand All @@ -2337,7 +2343,7 @@ func testJournaling(t *testing.T, nolocals bool) {
pool.Stop()

statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))
pool = NewTxPool(config, params.TestChainConfig, blockchain)

pending, queued = pool.Stats()
Expand Down Expand Up @@ -2366,7 +2372,7 @@ func TestStatusCheck(t *testing.T) {

// Create the pool to test the status retrievals with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down

0 comments on commit 80ff0b4

Please sign in to comment.