Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVM transaction replacement #206

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
887 changes: 608 additions & 279 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

45 changes: 34 additions & 11 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false, true)
txmp.removeTx(wtx, false, true, true)

Check warning on line 367 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L367

Added line #L367 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(once we have so many flags, it might make sense to split these out as named methods)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense. Or group them into a RemoveTxOptions struct that has named fields?

return nil
}

Expand Down Expand Up @@ -403,7 +403,7 @@
txmp.timestampIndex.Reset()

for _, wtx := range txmp.txStore.GetAllTxs() {
txmp.removeTx(wtx, false, false)
txmp.removeTx(wtx, false, false, true)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
Expand Down Expand Up @@ -515,7 +515,17 @@

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false, false)
txmp.removeTx(wtx, false, false, true)
}
if execTxResult[i].EvmTxInfo != nil {
// remove any tx that has the same nonce (because the committed tx
// may be from block proposal and is never in the local mempool)
if wtx, _ := txmp.priorityIndex.GetTxWithSameNonce(&WrappedTx{
evmAddress: execTxResult[i].EvmTxInfo.SenderAddress,
evmNonce: execTxResult[i].EvmTxInfo.Nonce,
}); wtx != nil {
txmp.removeTx(wtx, false, false, true)
}

Check warning on line 528 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L521-L528

Added lines #L521 - L528 were not covered by tests
}
}

Expand Down Expand Up @@ -636,7 +646,7 @@
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
txmp.removeTx(toEvict, true, true)
txmp.removeTx(toEvict, true, true, true)
txmp.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
Expand All @@ -655,11 +665,19 @@
txInfo.SenderID: {},
}

replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx)
if shouldDrop {
return nil
}

Check warning on line 671 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L670-L671

Added lines #L670 - L671 were not covered by tests

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

if txmp.insertTx(wtx) {
if replaced != nil {
txmp.removeTx(replaced, true, false, false)
}

Check warning on line 679 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L678-L679

Added lines #L678 - L679 were not covered by tests
if txmp.insertTx(wtx, replaced == nil) {
txmp.logger.Debug(
"inserted good transaction",
"priority", wtx.priority,
Expand Down Expand Up @@ -747,7 +765,7 @@
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true, true)

Check warning on line 768 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L768

Added line #L768 was not covered by tests
}
}

Expand Down Expand Up @@ -853,13 +871,15 @@
return nil
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool {
if txmp.isInMempool(wtx.tx) {
return false
}

txmp.txStore.SetTx(wtx)
txmp.priorityIndex.PushTx(wtx)
if updatePriorityIndex {
txmp.priorityIndex.PushTx(wtx)
}
txmp.heightIndex.Insert(wtx)
txmp.timestampIndex.Insert(wtx)

Expand All @@ -873,13 +893,16 @@
return true
}

func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool, updatePriorityIndex bool) {
if txmp.txStore.IsTxRemoved(wtx.hash) {
return
}

txmp.txStore.RemoveTx(wtx)
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
toBeReenqueued := []*WrappedTx{}
if updatePriorityIndex {
toBeReenqueued = txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
}
txmp.heightIndex.Remove(wtx)
txmp.timestampIndex.Remove(wtx)

Expand All @@ -894,7 +917,7 @@

if shouldReenqueue {
for _, reenqueue := range toBeReenqueued {
txmp.removeTx(reenqueue, removeFromCache, false)
txmp.removeTx(reenqueue, removeFromCache, false, true)
}
for _, reenqueue := range toBeReenqueued {
rtx := reenqueue.tx
Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestTxMempool_EVMEviction(t *testing.T) {

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
txmp.removeTx(tx, true, false)
txmp.removeTx(tx, true, false, true)
// should not reenqueue
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
Expand Down
52 changes: 50 additions & 2 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@

// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx // priority heap
mtx sync.RWMutex
txs []*WrappedTx // priority heap
// invariant 1: no duplicate nonce in the same queue
// invariant 2: no nonce gap in the same queue
// invariant 3: head of the queue must be in heap
evmQueue map[string][]*WrappedTx // sorted by nonce
}

Expand Down Expand Up @@ -50,6 +53,51 @@
return pq
}

func (pq *TxPriorityQueue) GetTxWithSameNonce(tx *WrappedTx) (*WrappedTx, int) {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
return pq.getTxWithSameNonceUnsafe(tx)

Check warning on line 59 in internal/mempool/priority_queue.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/priority_queue.go#L56-L59

Added lines #L56 - L59 were not covered by tests
}

func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx, int) {
queue, ok := pq.evmQueue[tx.evmAddress]
if !ok {
return nil, -1
}

Check warning on line 66 in internal/mempool/priority_queue.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/priority_queue.go#L65-L66

Added lines #L65 - L66 were not covered by tests
idx := binarySearch(queue, tx)
if idx < len(queue) && queue[idx].evmNonce == tx.evmNonce {
return queue[idx], idx
}
return nil, -1
}

func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
if !tx.isEVM {
return nil, false
}
pq.mtx.Lock()
defer pq.mtx.Unlock()
queue, ok := pq.evmQueue[tx.evmAddress]
if ok && len(queue) > 0 {
existing, idx := pq.getTxWithSameNonceUnsafe(tx)
if existing != nil {
if tx.priority > existing.priority {
// should replace
// replace heap if applicable
Comment on lines +85 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit comment fix

if hi, ok := pq.findTxIndexUnsafe(existing); ok {
heap.Remove(pq, hi)
heap.Push(pq, tx) // need to be in the heap since it has the same nonce
}
pq.evmQueue[tx.evmAddress][idx] = tx // replace queue item in-place
return existing, false
}
// tx should be dropped since it's dominated by an existing tx
return nil, true
}
}
return nil, false
}

// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
// evicted to make room for another *WrappedTx with higher priority. If no such
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
Expand Down
77 changes: 75 additions & 2 deletions internal/mempool/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
{sender: "2", isEVM: false, priority: 9},
{sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
{sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8},
{sender: "6", isEVM: false, priority: 6},
{sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5},
},
expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7},
expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7},
},
{
name: "PriorityWithEVMAndNonEVM",
Expand Down Expand Up @@ -371,3 +370,77 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
})
require.Equal(t, numTxs-2, pq.NumTxs())
}

func TestTxPriorityQueue_TryReplacement(t *testing.T) {
for _, test := range []struct {
tx *WrappedTx
existing []*WrappedTx
expectedReplaced bool
expectedDropped bool
expectedQueue []*WrappedTx
expectedHeap []*WrappedTx
}{
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, false, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, false, true, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
}, true, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
},
},
{
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 99, tx: []byte("ghi")},
}, true, false, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
}, []*WrappedTx{
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
},
},
} {
pq := NewTxPriorityQueue()
for _, e := range test.existing {
pq.PushTx(e)
}
replaced, dropped := pq.TryReplacement(test.tx)
if test.expectedReplaced {
require.NotNil(t, replaced)
} else {
require.Nil(t, replaced)
}
require.Equal(t, test.expectedDropped, dropped)
for i, q := range pq.evmQueue[test.tx.evmAddress] {
require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key())
require.Equal(t, test.expectedQueue[i].priority, q.priority)
require.Equal(t, test.expectedQueue[i].evmNonce, q.evmNonce)
}
for i, q := range pq.txs {
require.Equal(t, test.expectedHeap[i].tx.Key(), q.tx.Key())
require.Equal(t, test.expectedHeap[i].priority, q.priority)
require.Equal(t, test.expectedHeap[i].evmNonce, q.evmNonce)
}
}
}
4 changes: 2 additions & 2 deletions internal/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
secondaryReactor.observePanic = observePanic

firstTx := &WrappedTx{}
primaryMempool.insertTx(firstTx)
primaryMempool.insertTx(firstTx, true)

// run the router
rts.start(ctx, t)
Expand All @@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
primaryMempool.insertTx(next)
primaryMempool.insertTx(next, true)
}()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type WrappedTx struct {
// IsBefore returns true if the WrappedTx is before the given WrappedTx
// this applies to EVM transactions only
func (wtx *WrappedTx) IsBefore(tx *WrappedTx) bool {
return wtx.evmNonce < tx.evmNonce || (wtx.evmNonce == tx.evmNonce && wtx.timestamp.Before(tx.timestamp))
return wtx.evmNonce < tx.evmNonce
}

func (wtx *WrappedTx) Size() int {
Expand Down
7 changes: 7 additions & 0 deletions proto/tendermint/abci/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ message ResponseDeliverTx {
repeated Event events = 7
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
string codespace = 8;
EvmTxInfo evm_tx_info = 9;
}

message ResponseEndBlock {
Expand Down Expand Up @@ -458,6 +459,7 @@ message ExecTxResult {
repeated Event events = 7
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
string codespace = 8;
EvmTxInfo evm_tx_info = 9;
}

// TxResult contains results of executing the transaction.
Expand Down Expand Up @@ -564,6 +566,11 @@ message Evidence {
int64 total_voting_power = 5;
}

message EvmTxInfo {
string senderAddress = 1;
uint64 nonce = 2;
}

//----------------------------------------
// State Sync Types

Expand Down
1 change: 0 additions & 1 deletion proto/tendermint/crypto/keys.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/mempool/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/p2p/conn.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion proto/tendermint/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading