Skip to content

Commit

Permalink
[EVM] Fix duplicate evm txs from priority queue (#195)
Browse files Browse the repository at this point in the history
* debug duplicate evm tx

* add more logs

* add some \ns

* more logs

* fix swap check

* add-lockable-reap-by-gas

* add invariant checks

* fix invariant parenthesis

* fix log

* remove invalid invariant

* fix nonce ordering pain

* handle ordering of insert

* fix remove

* cleanup

* fix imports

* cleanup

* avoid getTransactionByHash(hash) panic due to index

* use Key() to compare instead of pointer
  • Loading branch information
stevenlanders authored Jan 25, 2024
1 parent 217570a commit 830d8ab
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 36 deletions.
30 changes: 8 additions & 22 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,42 +399,28 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
totalSize int64
)

// wTxs contains a list of *WrappedTx retrieved from the priority queue that
// need to be re-enqueued prior to returning.
wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
defer func() {
for _, wtx := range wTxs {
txmp.priorityIndex.PushTx(wtx)
}
}()

txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
var txs []types.Tx
if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold {
// do not reap anything if threshold is not met
return txs
}
for txmp.priorityIndex.NumTxs() > 0 {
wtx := txmp.priorityIndex.PopTx()
txs = append(txs, wtx.tx)
wTxs = append(wTxs, wtx)
txmp.priorityIndex.ForEachTx(func(wtx *WrappedTx) bool {
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})

// Ensure we have capacity for the transaction with respect to the
// transaction size.
if maxBytes > -1 && totalSize+size > maxBytes {
return txs[:len(txs)-1]
return false
}

totalSize += size

// ensure we have capacity for the transaction with respect to total gas
gas := totalGas + wtx.gasWanted
if maxGas > -1 && gas > maxGas {
return txs[:len(txs)-1]
return false
}

totalGas = gas
}

txs = append(txs, wtx.tx)
return true
})

return txs
}
Expand Down
106 changes: 96 additions & 10 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx
evmQueue map[string][]*WrappedTx
txs []*WrappedTx // priority heap
evmQueue map[string][]*WrappedTx // sorted by nonce
}

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx {
// Using BinarySearch to find the appropriate index to insert tx
i := binarySearch(queue, tx)

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx, i int) []*WrappedTx {
// Make room for new value and add it
queue = append(queue, nil)
copy(queue[i+1:], queue[i:])
Expand All @@ -33,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int {
low, high := 0, len(queue)
for low < high {
mid := low + (high-low)/2
if queue[mid].evmNonce < tx.evmNonce {
if queue[mid].evmNonce <= tx.evmNonce {
low = mid + 1
} else {
high = mid
Expand Down Expand Up @@ -117,12 +114,15 @@ func (pq *TxPriorityQueue) NumTxs() int {
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.evmNonce == tx.evmNonce {
if t.tx.Key() == tx.tx.Key() {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
} else {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
// only if removing the first item, then push next onto queue
if i == 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
}
break
}
Expand Down Expand Up @@ -174,9 +174,67 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
heap.Push(pq, tx)
}

pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx)
pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx))

}

// These are available if we need to test the invariant checks
// these can be used to troubleshoot invariant violations
//func (pq *TxPriorityQueue) checkInvariants(msg string) {
//
// uniqHashes := make(map[string]bool)
// for _, tx := range pq.txs {
// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key()))
// }
// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// if tx.isEVM {
// if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
// if queue[0].tx.Key() != tx.tx.Key() {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key()))
// }
// } else {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key()))
// }
// }
// }
//
// // each item in all queues should be unique nonce
// for _, queue := range pq.evmQueue {
// hashes := make(map[string]bool)
// for idx, tx := range queue {
// if idx == 0 {
// _, ok := pq.findTxIndexUnsafe(tx)
// if !ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce))
// }
// }
// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce))
// }
// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// }
// }
//}

// for debugging situations where invariant violations occur
//func (pq *TxPriorityQueue) print() {
// for _, tx := range pq.txs {
// fmt.Printf("DEBUG PRINT: heap: nonce=%d, hash=%x\n", tx.evmNonce, tx.tx.Key())
// }
//
// for _, queue := range pq.evmQueue {
// for idx, tx := range queue {
// fmt.Printf("DEBUG PRINT: evmQueue[%d]: nonce=%d, hash=%x\n", idx, tx.evmNonce, tx.tx.Key())
// }
// }
//}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
Expand All @@ -185,6 +243,9 @@ func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
}

func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
if len(pq.txs) == 0 {
return nil
}
x := heap.Pop(pq)
if x == nil {
return nil
Expand All @@ -209,6 +270,31 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx {
}

// dequeue up to `max` transactions and reenqueue while locked
func (pq *TxPriorityQueue) ForEachTx(handler func(wtx *WrappedTx) bool) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

numTxs := len(pq.txs) + pq.numQueuedUnsafe()

txs := make([]*WrappedTx, 0, numTxs)

defer func() {
for _, tx := range txs {
pq.pushTxUnsafe(tx)
}
}()

for i := 0; i < numTxs; i++ {
popped := pq.popTxUnsafe()
txs = append(txs, popped)
if !handler(popped) {
return
}
}
}

// dequeue up to `max` transactions and reenqueue while locked
// TODO: use ForEachTx instead
func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()
Expand Down
34 changes: 31 additions & 3 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ func TestTxPriorityQueue_ReapHalf(t *testing.T) {

func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
testCases := []TxTestCase{
{
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
inputTxs: []*WrappedTx{
{sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10},
{sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9},
{sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7},
},
expectedOutput: []int64{1, 2, 3},
},
{
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
inputTxs: []*WrappedTx{
{sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10},
{sender: "2", isEVM: false, priority: 9},
{sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8},
{sender: "6", isEVM: false, priority: 6},
{sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5},
},
expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7},
},
{
name: "PriorityWithEVMAndNonEVM",
inputTxs: []*WrappedTx{
Expand Down Expand Up @@ -107,6 +130,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
// Add input transactions to the queue and set timestamp to order inserted
for i, tx := range tc.inputTxs {
tx.timestamp = now.Add(time.Duration(i) * time.Second)
tx.tx = []byte(fmt.Sprintf("%d", time.Now().UnixNano()))
pq.PushTx(tx)
}

Expand All @@ -126,9 +150,9 @@ func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) {
address := "0x123"

// Insert transactions with the same address but different nonces and priorities
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10, tx: []byte("tx1")})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5, tx: []byte("tx2")})
pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15, tx: []byte("tx3")})

// Pop transactions and verify they are in the correct order of nonce
tx1 := pq.PopTx()
Expand All @@ -154,6 +178,7 @@ func TestTxPriorityQueue(t *testing.T) {
pq.PushTx(&WrappedTx{
priority: int64(i),
timestamp: time.Now(),
tx: []byte(fmt.Sprintf("%d", i)),
})

wg.Done()
Expand Down Expand Up @@ -278,12 +303,14 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) {
isEVM: true,
evmAddress: "0xabc",
evmNonce: 1,
tx: []byte("tx1"),
}
tx2 := &WrappedTx{
priority: 1,
isEVM: true,
evmAddress: "0xabc",
evmNonce: 2,
tx: []byte("tx2"),
}

pq.PushTx(tx1)
Expand All @@ -306,6 +333,7 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
priority: int64(x),
tx: []byte(fmt.Sprintf("%d", i)),
})

values[i] = x
Expand Down
3 changes: 3 additions & 0 deletions internal/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque
skipCount := validateSkipCount(page, perPage)

txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))
if skipCount > len(txs) {
skipCount = len(txs)
}
result := txs[skipCount:]

return &coretypes.ResultUnconfirmedTxs{
Expand Down
2 changes: 1 addition & 1 deletion types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (t TxRecordSet) Validate(maxSizeBytes int64, otxs Txs) error {
for i, cur := range allCopy {
// allCopy is sorted, so any duplicated data will be adjacent.
if i+1 < len(allCopy) && bytes.Equal(cur, allCopy[i+1]) {
return fmt.Errorf("found duplicate transaction with hash: %x", cur.Hash())
return fmt.Errorf("found duplicate transaction with hash: %x", cur.Key())
}
}

Expand Down

0 comments on commit 830d8ab

Please sign in to comment.