diff --git a/abci/types/types.go b/abci/types/types.go index 4df6d4751..b40865524 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -247,11 +247,13 @@ const ( ) type PendingTxChecker func() PendingTxCheckerResponse +type ExpireTxHandler func() -// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able +// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able // to utilize the new fields in V2 type (but still be backwards-compatible) type ResponseCheckTxV2 struct { *ResponseCheckTx IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true + ExpireTxHandler ExpireTxHandler } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 70489bed6..84b251908 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -291,13 +291,20 @@ func (txmp *TxMempool) CheckTx( hash: txHash, timestamp: time.Now().UTC(), height: txmp.height, + expiredCallback: func(removeFromCache bool) { + if removeFromCache { + txmp.cache.Remove(tx) + } + if res.ExpireTxHandler != nil { + res.ExpireTxHandler() + } + }, } if err == nil { // only add new transaction if checkTx passes and is not pending if !res.IsPendingTransaction { err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo) - if err != nil { return err } @@ -837,9 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } + wtx.expiredCallback(removeFromCache) } // purgeExpiredTxs removes all transactions that have exceeded their respective @@ -888,10 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) } - txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove) + // remove pending txs that have expired + txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { + wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache) + }) } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 2f3ec7b8f..4d5762be5 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -64,6 +64,9 @@ type WrappedTx struct { // transaction in the mempool can be evicted when it is simultaneously having // a reCheckTx callback executed. removed bool + + // this is the callback that can be called when a transaction expires + expiredCallback func(removeFromCache bool) } func (wtx *WrappedTx) Size() int { @@ -295,10 +298,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { type PendingTxs struct { mtx *sync.RWMutex - txs []PendingTxInfo + txs []TxWithResponse } -type PendingTxInfo struct { +type TxWithResponse struct { tx *WrappedTx checkTxResponse *abci.ResponseCheckTxV2 txInfo TxInfo @@ -307,13 +310,12 @@ type PendingTxInfo struct { func NewPendingTxs() *PendingTxs { return &PendingTxs{ mtx: &sync.RWMutex{}, - txs: []PendingTxInfo{}, + txs: []TxWithResponse{}, } } - func (p *PendingTxs) EvaluatePendingTransactions() ( - acceptedTxs []PendingTxInfo, - rejectedTxs []PendingTxInfo, + acceptedTxs []TxWithResponse, + rejectedTxs []TxWithResponse, ) { poppedIndices := []int{} p.mtx.Lock() @@ -337,7 +339,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { if len(indices) == 0 { return } - newTxs := []PendingTxInfo{} + newTxs := []TxWithResponse{} start := 0 for _, idx := range indices { newTxs = append(newTxs, p.txs[start:idx]...) @@ -350,14 +352,14 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { p.mtx.Lock() defer p.mtx.Unlock() - p.txs = append(p.txs, PendingTxInfo{ + p.txs = append(p.txs, TxWithResponse{ tx: tx, checkTxResponse: resCheckTx, txInfo: txInfo, }) } -func (p *PendingTxs) Peek(max int) []PendingTxInfo { +func (p *PendingTxs) Peek(max int) []TxWithResponse { p.mtx.RLock() defer p.mtx.RUnlock() // priority is fifo @@ -373,7 +375,7 @@ func (p *PendingTxs) Size() int { return len(p.txs) } -func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) { +func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) { p.mtx.Lock() defer p.mtx.Unlock() @@ -385,10 +387,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlNumBlock > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if (blockHeight - ptx.tx.height) <= ttlNumBlock { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:] @@ -401,10 +405,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlDuration > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if now.Sub(ptx.tx.timestamp) <= ttlDuration { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:]