Skip to content

Commit

Permalink
Make ReadMaxTxs atomic (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored and stevenlanders committed Jan 30, 2024
1 parent da59b8d commit cefd40f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
21 changes: 3 additions & 18 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -420,24 +419,10 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

numTxs := txmp.priorityIndex.NumTxs()
if max < 0 {
max = numTxs
}

cap := tmmath.MinInt(numTxs, max)

// wTxs contains a list of *WrappedTx retrieved from the priority queue that
// need to be re-enqueued prior to returning.
wTxs := make([]*WrappedTx, 0, cap)
txs := make([]types.Tx, 0, cap)
for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
wtx := txmp.priorityIndex.PopTx()
txs = append(txs, wtx.tx)
wTxs = append(wTxs, wtx)
}
wTxs := txmp.priorityIndex.PeekTxs(max)
txs := make([]types.Tx, 0, len(wTxs))
for _, wtx := range wTxs {
txmp.priorityIndex.PushTx(wtx)
txs = append(txs, wtx.tx)
}
return txs
}
Expand Down
28 changes: 28 additions & 0 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"container/heap"
"sort"
"sync"

tmmath "github.com/tendermint/tendermint/libs/math"
)

var _ heap.Interface = (*TxPriorityQueue)(nil)
Expand Down Expand Up @@ -106,6 +108,32 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx {
return nil
}

// dequeue up to `max` transactions and reenqueue while locked
func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()

numTxs := len(pq.txs)
if max < 0 {
max = numTxs
}

cap := tmmath.MinInt(numTxs, max)
res := make([]*WrappedTx, 0, cap)
for i := 0; i < cap; i++ {
popped := heap.Pop(pq)
if popped == nil {
break
}
res = append(res, popped.(*WrappedTx))
}

for _, tx := range res {
heap.Push(pq, tx)
}
return res
}

// Push implements the Heap interface.
//
// NOTE: A caller should never call Push. Use PushTx instead.
Expand Down

0 comments on commit cefd40f

Please sign in to comment.