Skip to content

Commit

Permalink
[EVM] Fix evm pending nonce (#179)
Browse files Browse the repository at this point in the history
* Perf: Increase buffer size for pubsub server to boost performance (#167)

* Increase buffer size for pubsub server

* Add more timeout for test failure

* Add more timeout

* Fix test split scripts

* Fix test split

* Fix unit test

* Unit test

* Unit test

* [P2P] Optimize block pool requester retry and peer pick up logic (#170)

* P2P Improvements: Fix block sync reactor and block pool retry logic

* Revert "Add event data to result event (#165)" (#176)

This reverts commit 72bb29c.

* Fix block sync auto restart not working as expected (#175)

* Fix edge case for blocksync (#178)

* fix evm pending nonce

* fix test

* deflake a test

* de-flake test

* Revert "merge main"

This reverts commit 58b9424, reversing
changes made to 02d1478.

* consider keep-in-cache logic when removing from cache

* undo test tweaks

---------

Co-authored-by: Yiming Zang <[email protected]>
Co-authored-by: Jeremy Wei <[email protected]>
  • Loading branch information
3 people authored and udpatil committed Apr 16, 2024
1 parent 4e18f06 commit 675fafc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
4 changes: 3 additions & 1 deletion abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ const (
)

type PendingTxChecker func() PendingTxCheckerResponse
type ExpireTxHandler func()

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler
}
20 changes: 14 additions & 6 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,20 @@ func (txmp *TxMempool) CheckTx(
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
expiredCallback: func(removeFromCache bool) {
if removeFromCache {
txmp.cache.Remove(tx)
}
if res.ExpireTxHandler != nil {
res.ExpireTxHandler()
}
},
}

if err == nil {
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)

if err != nil {
return err
}
Expand Down Expand Up @@ -837,9 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {

atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

if removeFromCache {
txmp.cache.Remove(wtx.tx)
}
wtx.expiredCallback(removeFromCache)
}

// purgeExpiredTxs removes all transactions that have exceeded their respective
Expand Down Expand Up @@ -888,10 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

for _, wtx := range expiredTxs {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
}

txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove)
// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache)
})
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
30 changes: 18 additions & 12 deletions internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type WrappedTx struct {
// transaction in the mempool can be evicted when it is simultaneously having
// a reCheckTx callback executed.
removed bool

// this is the callback that can be called when a transaction expires
expiredCallback func(removeFromCache bool)
}

func (wtx *WrappedTx) Size() int {
Expand Down Expand Up @@ -295,10 +298,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {

type PendingTxs struct {
mtx *sync.RWMutex
txs []PendingTxInfo
txs []TxWithResponse
}

type PendingTxInfo struct {
type TxWithResponse struct {
tx *WrappedTx
checkTxResponse *abci.ResponseCheckTxV2
txInfo TxInfo
Expand All @@ -307,13 +310,12 @@ type PendingTxInfo struct {
func NewPendingTxs() *PendingTxs {
return &PendingTxs{
mtx: &sync.RWMutex{},
txs: []PendingTxInfo{},
txs: []TxWithResponse{},
}
}

func (p *PendingTxs) EvaluatePendingTransactions() (
acceptedTxs []PendingTxInfo,
rejectedTxs []PendingTxInfo,
acceptedTxs []TxWithResponse,
rejectedTxs []TxWithResponse,
) {
poppedIndices := []int{}
p.mtx.Lock()
Expand All @@ -337,7 +339,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) {
if len(indices) == 0 {
return
}
newTxs := []PendingTxInfo{}
newTxs := []TxWithResponse{}
start := 0
for _, idx := range indices {
newTxs = append(newTxs, p.txs[start:idx]...)
Expand All @@ -350,14 +352,14 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) {
func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.txs = append(p.txs, PendingTxInfo{
p.txs = append(p.txs, TxWithResponse{
tx: tx,
checkTxResponse: resCheckTx,
txInfo: txInfo,
})
}

func (p *PendingTxs) Peek(max int) []PendingTxInfo {
func (p *PendingTxs) Peek(max int) []TxWithResponse {
p.mtx.RLock()
defer p.mtx.RUnlock()
// priority is fifo
Expand All @@ -373,7 +375,7 @@ func (p *PendingTxs) Size() int {
return len(p.txs)
}

func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) {
func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -385,10 +387,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat
if ttlNumBlock > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
// once found, we can break because these are ordered
if (blockHeight - ptx.tx.height) <= ttlNumBlock {
idxFirstNotExpiredTx = i
break
} else {
cb(ptx.tx.tx)
cb(ptx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
Expand All @@ -401,10 +405,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat
if ttlDuration > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
// once found, we can break because these are ordered
if now.Sub(ptx.tx.timestamp) <= ttlDuration {
idxFirstNotExpiredTx = i
break
} else {
cb(ptx.tx.tx)
cb(ptx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
Expand Down

0 comments on commit 675fafc

Please sign in to comment.