Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EVM] Fix duplicate evm txs from priority queue #195

Merged
merged 18 commits into from
Jan 25, 2024
Merged
30 changes: 8 additions & 22 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,42 +399,28 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
totalSize int64
)

// wTxs contains a list of *WrappedTx retrieved from the priority queue that
// need to be re-enqueued prior to returning.
wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
defer func() {
for _, wtx := range wTxs {
txmp.priorityIndex.PushTx(wtx)
}
}()

txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
var txs []types.Tx
if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold {
// do not reap anything if threshold is not met
return txs
}
for txmp.priorityIndex.NumTxs() > 0 {
wtx := txmp.priorityIndex.PopTx()
txs = append(txs, wtx.tx)
wTxs = append(wTxs, wtx)
txmp.priorityIndex.ForEachTx(func(wtx *WrappedTx) bool {
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})

// Ensure we have capacity for the transaction with respect to the
// transaction size.
if maxBytes > -1 && totalSize+size > maxBytes {
return txs[:len(txs)-1]
return false
}

totalSize += size

// ensure we have capacity for the transaction with respect to total gas
gas := totalGas + wtx.gasWanted
if maxGas > -1 && gas > maxGas {
return txs[:len(txs)-1]
return false
}

totalGas = gas
}

txs = append(txs, wtx.tx)
return true
})

return txs
}
Expand Down
106 changes: 96 additions & 10 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx
evmQueue map[string][]*WrappedTx
txs []*WrappedTx // priority heap
evmQueue map[string][]*WrappedTx // sorted by nonce
}

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx {
// Using BinarySearch to find the appropriate index to insert tx
i := binarySearch(queue, tx)

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx, i int) []*WrappedTx {
// Make room for new value and add it
queue = append(queue, nil)
copy(queue[i+1:], queue[i:])
Expand All @@ -33,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int {
low, high := 0, len(queue)
for low < high {
mid := low + (high-low)/2
if queue[mid].evmNonce < tx.evmNonce {
if queue[mid].evmNonce <= tx.evmNonce {
low = mid + 1
} else {
high = mid
Expand Down Expand Up @@ -117,12 +114,15 @@ func (pq *TxPriorityQueue) NumTxs() int {
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.evmNonce == tx.evmNonce {
if t.tx.Key() == tx.tx.Key() {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
} else {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
// only if removing the first item, then push next onto queue
if i == 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
}
break
}
Expand Down Expand Up @@ -174,9 +174,67 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
heap.Push(pq, tx)
}

pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx)
pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx))

}

// These are available if we need to test the invariant checks
// these can be used to troubleshoot invariant violations
//func (pq *TxPriorityQueue) checkInvariants(msg string) {
//
// uniqHashes := make(map[string]bool)
// for _, tx := range pq.txs {
// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key()))
// }
// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// if tx.isEVM {
// if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
// if queue[0].tx.Key() != tx.tx.Key() {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key()))
// }
// } else {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key()))
// }
// }
// }
//
// // each item in all queues should be unique nonce
// for _, queue := range pq.evmQueue {
// hashes := make(map[string]bool)
// for idx, tx := range queue {
// if idx == 0 {
// _, ok := pq.findTxIndexUnsafe(tx)
// if !ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce))
// }
// }
// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce))
// }
// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// }
// }
//}

// for debugging situations where invariant violations occur
//func (pq *TxPriorityQueue) print() {
// for _, tx := range pq.txs {
// fmt.Printf("DEBUG PRINT: heap: nonce=%d, hash=%x\n", tx.evmNonce, tx.tx.Key())
// }
//
// for _, queue := range pq.evmQueue {
// for idx, tx := range queue {
// fmt.Printf("DEBUG PRINT: evmQueue[%d]: nonce=%d, hash=%x\n", idx, tx.evmNonce, tx.tx.Key())
// }
// }
//}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
Expand All @@ -185,6 +243,9 @@ func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
}

func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
if len(pq.txs) == 0 {
return nil
}
x := heap.Pop(pq)
if x == nil {
return nil
Expand All @@ -209,6 +270,31 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx {
}

// dequeue up to `max` transactions and reenqueue while locked
func (pq *TxPriorityQueue) ForEachTx(handler func(wtx *WrappedTx) bool) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

numTxs := len(pq.txs) + pq.numQueuedUnsafe()

txs := make([]*WrappedTx, 0, numTxs)

defer func() {
for _, tx := range txs {
pq.pushTxUnsafe(tx)
}
}()

for i := 0; i < numTxs; i++ {
popped := pq.popTxUnsafe()
txs = append(txs, popped)
if !handler(popped) {
return
}
}
}

// dequeue up to `max` transactions and reenqueue while locked
// TODO: use ForEachTx instead
func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()
Expand Down
34 changes: 31 additions & 3 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ func TestTxPriorityQueue_ReapHalf(t *testing.T) {

func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
testCases := []TxTestCase{
{
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
inputTxs: []*WrappedTx{
{sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10},
{sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9},
{sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7},
},
expectedOutput: []int64{1, 2, 3},
},
{
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
inputTxs: []*WrappedTx{
{sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10},
{sender: "2", isEVM: false, priority: 9},
{sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8},
{sender: "6", isEVM: false, priority: 6},
{sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5},
},
expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7},
},
{
name: "PriorityWithEVMAndNonEVM",
inputTxs: []*WrappedTx{
Expand Down Expand Up @@ -107,6 +130,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
// Add input transactions to the queue and set timestamp to order inserted
for i, tx := range tc.inputTxs {
tx.timestamp = now.Add(time.Duration(i) * time.Second)
tx.tx = []byte(fmt.Sprintf("%d", time.Now().UnixNano()))
pq.PushTx(tx)
}

Expand All @@ -126,9 +150,9 @@ func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) {
address := "0x123"

// Insert transactions with the same address but different nonces and priorities
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10, tx: []byte("tx1")})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5, tx: []byte("tx2")})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15, tx: []byte("tx3")})

// Pop transactions and verify they are in the correct order of nonce
tx1 := pq.PopTx()
Expand All @@ -154,6 +178,7 @@ func TestTxPriorityQueue(t *testing.T) {
pq.PushTx(&WrappedTx{
priority: int64(i),
timestamp: time.Now(),
tx: []byte(fmt.Sprintf("%d", i)),
})

wg.Done()
Expand Down Expand Up @@ -278,12 +303,14 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) {
isEVM: true,
evmAddress: "0xabc",
evmNonce: 1,
tx: []byte("tx1"),
}
tx2 := &WrappedTx{
priority: 1,
isEVM: true,
evmAddress: "0xabc",
evmNonce: 2,
tx: []byte("tx2"),
}

pq.PushTx(tx1)
Expand All @@ -306,6 +333,7 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
priority: int64(x),
tx: []byte(fmt.Sprintf("%d", i)),
})

values[i] = x
Expand Down
3 changes: 3 additions & 0 deletions internal/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque
skipCount := validateSkipCount(page, perPage)

txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))
if skipCount > len(txs) {
skipCount = len(txs)
}
result := txs[skipCount:]

return &coretypes.ResultUnconfirmedTxs{
Expand Down
2 changes: 1 addition & 1 deletion types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (t TxRecordSet) Validate(maxSizeBytes int64, otxs Txs) error {
for i, cur := range allCopy {
// allCopy is sorted, so any duplicated data will be adjacent.
if i+1 < len(allCopy) && bytes.Equal(cur, allCopy[i+1]) {
return fmt.Errorf("found duplicate transaction with hash: %x", cur.Hash())
return fmt.Errorf("found duplicate transaction with hash: %x", cur.Key())
}
}

Expand Down
Loading