From 3c40e3514ac6d04e078c76accba99df1d182eaf9 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 4 Jan 2024 09:22:26 -0500 Subject: [PATCH] [EVM] Fix evm pending nonce (#179) * Perf: Increase buffer size for pubsub server to boost performance (#167) * Increase buffer size for pubsub server * Add more timeout for test failure * Add more timeout * Fix test split scripts * Fix test split * Fix unit test * Unit test * Unit test * [P2P] Optimize block pool requester retry and peer pick up logic (#170) * P2P Improvements: Fix block sync reactor and block pool retry logic * Revert "Add event data to result event (#165)" (#176) This reverts commit 72bb29caf821ed03900bcd74f4ddb439df85f5f1. * Fix block sync auto restart not working as expected (#175) * Fix edge case for blocksync (#178) * fix evm pending nonce * fix test * deflake a test * de-flake test * Revert "merge main" This reverts commit 58b94248ac561583b918775c0b0e220d4225fbc0, reversing changes made to 02d14782edc6c58d59db2658a4190dbdc5c7280f. * consider keep-in-cache logic when removing from cache * undo test tweaks --------- Co-authored-by: Yiming Zang <50607998+yzang2019@users.noreply.github.com> Co-authored-by: Jeremy Wei --- abci/types/types.go | 4 +++- internal/mempool/mempool.go | 20 ++++++++++++++------ internal/mempool/tx.go | 30 ++++++++++++++++++------------ 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/abci/types/types.go b/abci/types/types.go index 4df6d4751..b40865524 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -247,11 +247,13 @@ const ( ) type PendingTxChecker func() PendingTxCheckerResponse +type ExpireTxHandler func() -// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able +// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able // to utilize the new fields in V2 type (but still be backwards-compatible) type ResponseCheckTxV2 struct { *ResponseCheckTx IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true + ExpireTxHandler ExpireTxHandler } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 70489bed6..84b251908 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -291,13 +291,20 @@ func (txmp *TxMempool) CheckTx( hash: txHash, timestamp: time.Now().UTC(), height: txmp.height, + expiredCallback: func(removeFromCache bool) { + if removeFromCache { + txmp.cache.Remove(tx) + } + if res.ExpireTxHandler != nil { + res.ExpireTxHandler() + } + }, } if err == nil { // only add new transaction if checkTx passes and is not pending if !res.IsPendingTransaction { err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo) - if err != nil { return err } @@ -837,9 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } + wtx.expiredCallback(removeFromCache) } // purgeExpiredTxs removes all transactions that have exceeded their respective @@ -888,10 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) } - txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove) + // remove pending txs that have expired + txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { + wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache) + }) } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 2f3ec7b8f..4d5762be5 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -64,6 +64,9 @@ type WrappedTx struct { // transaction in the mempool can be evicted when it is simultaneously having // a reCheckTx callback executed. removed bool + + // this is the callback that can be called when a transaction expires + expiredCallback func(removeFromCache bool) } func (wtx *WrappedTx) Size() int { @@ -295,10 +298,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { type PendingTxs struct { mtx *sync.RWMutex - txs []PendingTxInfo + txs []TxWithResponse } -type PendingTxInfo struct { +type TxWithResponse struct { tx *WrappedTx checkTxResponse *abci.ResponseCheckTxV2 txInfo TxInfo @@ -307,13 +310,12 @@ type PendingTxInfo struct { func NewPendingTxs() *PendingTxs { return &PendingTxs{ mtx: &sync.RWMutex{}, - txs: []PendingTxInfo{}, + txs: []TxWithResponse{}, } } - func (p *PendingTxs) EvaluatePendingTransactions() ( - acceptedTxs []PendingTxInfo, - rejectedTxs []PendingTxInfo, + acceptedTxs []TxWithResponse, + rejectedTxs []TxWithResponse, ) { poppedIndices := []int{} p.mtx.Lock() @@ -337,7 +339,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { if len(indices) == 0 { return } - newTxs := []PendingTxInfo{} + newTxs := []TxWithResponse{} start := 0 for _, idx := range indices { newTxs = append(newTxs, p.txs[start:idx]...) @@ -350,14 +352,14 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { p.mtx.Lock() defer p.mtx.Unlock() - p.txs = append(p.txs, PendingTxInfo{ + p.txs = append(p.txs, TxWithResponse{ tx: tx, checkTxResponse: resCheckTx, txInfo: txInfo, }) } -func (p *PendingTxs) Peek(max int) []PendingTxInfo { +func (p *PendingTxs) Peek(max int) []TxWithResponse { p.mtx.RLock() defer p.mtx.RUnlock() // priority is fifo @@ -373,7 +375,7 @@ func (p *PendingTxs) Size() int { return len(p.txs) } -func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) { +func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) { p.mtx.Lock() defer p.mtx.Unlock() @@ -385,10 +387,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlNumBlock > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if (blockHeight - ptx.tx.height) <= ttlNumBlock { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:] @@ -401,10 +405,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlDuration > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if now.Sub(ptx.tx.timestamp) <= ttlDuration { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:]