From da59b8d6893be7a1c65e17c764e394438babbc16 Mon Sep 17 00:00:00 2001 From: Yiming Zang <50607998+yzang2019@users.noreply.github.com> Date: Fri, 12 Jan 2024 08:50:21 -0800 Subject: [PATCH 01/16] Standardize lag status response format (#187) * Standardize lag status response format * Fix flaky unit test --- internal/consensus/state_test.go | 2 +- rpc/jsonrpc/server/http_server.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 200d792c4..d82718b77 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2483,7 +2483,7 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { round++ // moving to the next round ensureNewRound(t, newRoundCh, height, round) rs := cs1.GetRoundState() - assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires + assert.Equal(t, true, rs.Step == cstypes.RoundStepPropose || rs.Step == cstypes.RoundStepNewRound) ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) diff --git a/rpc/jsonrpc/server/http_server.go b/rpc/jsonrpc/server/http_server.go index 1a3dd4449..67808e59d 100644 --- a/rpc/jsonrpc/server/http_server.go +++ b/rpc/jsonrpc/server/http_server.go @@ -146,8 +146,9 @@ func writeHTTPResponse(w http.ResponseWriter, log log.Logger, rsp rpctypes.RPCRe return } statusCode := http.StatusOK - // If there's any error for lag is high, override the status code + // If there's any error for lag is high, override the status code and response body if rsp.Error != nil && rsp.Error.Code == int(rpctypes.CodeLagIsHighError) { + body = rsp.Result statusCode = http.StatusExpectationFailed } w.Header().Set("Content-Type", "application/json") From cefd40f9b150b8be6fd5f67965b1485b0efd5704 Mon Sep 17 00:00:00 2001 From: codchen Date: Fri, 17 Nov 2023 12:09:54 +0800 Subject: [PATCH 02/16] Make ReadMaxTxs atomic (#166) --- internal/mempool/mempool.go | 21 +++------------------ internal/mempool/priority_queue.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index ae7b3c7c5..b57603b50 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -15,7 +15,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" ) @@ -420,24 +419,10 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { txmp.mtx.RLock() defer txmp.mtx.RUnlock() - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs - } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - } + wTxs := txmp.priorityIndex.PeekTxs(max) + txs := make([]types.Tx, 0, len(wTxs)) for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + txs = append(txs, wtx.tx) } return txs } diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index e31997397..ad3a347a3 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -4,6 +4,8 @@ import ( "container/heap" "sort" "sync" + + tmmath "github.com/tendermint/tendermint/libs/math" ) var _ heap.Interface = (*TxPriorityQueue)(nil) @@ -106,6 +108,32 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx { return nil } +// dequeue up to `max` transactions and reenqueue while locked +func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { + pq.mtx.Lock() + defer pq.mtx.Unlock() + + numTxs := len(pq.txs) + if max < 0 { + max = numTxs + } + + cap := tmmath.MinInt(numTxs, max) + res := make([]*WrappedTx, 0, cap) + for i := 0; i < cap; i++ { + popped := heap.Pop(pq) + if popped == nil { + break + } + res = append(res, popped.(*WrappedTx)) + } + + for _, tx := range res { + heap.Push(pq, tx) + } + return res +} + // Push implements the Heap interface. // // NOTE: A caller should never call Push. Use PushTx instead. From 434a59378ffc315af378e55527f6949177bb1517 Mon Sep 17 00:00:00 2001 From: codchen Date: Sat, 16 Dec 2023 22:22:55 +0800 Subject: [PATCH 03/16] Support pending transaction in mempool (#169) --- abci/client/grpc_client.go | 5 ++- abci/client/mocks/client.go | 8 ++-- abci/client/socket_client.go | 4 +- abci/example/kvstore/kvstore.go | 4 +- abci/server/grpc_server.go | 8 ++++ abci/types/application.go | 6 +-- abci/types/messages.go | 4 +- abci/types/mocks/application.go | 8 ++-- abci/types/types.go | 18 ++++++++ internal/consensus/mempool_test.go | 8 ++-- internal/mempool/mempool.go | 45 +++++++++++++++---- internal/mempool/mempool_test.go | 16 +++---- internal/mempool/tx.go | 71 ++++++++++++++++++++++++++++-- internal/proxy/client.go | 2 +- internal/rpc/core/mempool.go | 2 +- internal/state/helpers_test.go | 4 +- rpc/client/mock/abci.go | 2 +- test/e2e/app/app.go | 10 +++-- 18 files changed, 174 insertions(+), 51 deletions(-) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index f1be2c120..8eec13ef8 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true)) } -func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) { - return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true)) +func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { + resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true)) + return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err } func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) { diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go index 44a5eb00f..a0626cff0 100644 --- a/abci/client/mocks/client.go +++ b/abci/client/mocks/client.go @@ -38,15 +38,15 @@ func (_m *Client) ApplySnapshotChunk(_a0 context.Context, _a1 *types.RequestAppl } // CheckTx provides a mock function with given fields: _a0, _a1 -func (_m *Client) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (_m *Client) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { ret := _m.Called(_a0, _a1) - var r0 *types.ResponseCheckTx - if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTx); ok { + var r0 *types.ResponseCheckTxV2 + if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTxV2); ok { r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.ResponseCheckTx) + r0 = ret.Get(0).(*types.ResponseCheckTxV2) } } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 80f106333..35f244666 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ return res.GetInfo(), nil } -func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req)) if err != nil { return nil, err } - return res.GetCheckTx(), nil + return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil } func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) { diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index f33ff2be1..ac0d122b1 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil } -func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { - return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil +func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { + return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil } func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) { diff --git a/abci/server/grpc_server.go b/abci/server/grpc_server.go index 0dfee8169..6f7caf39c 100644 --- a/abci/server/grpc_server.go +++ b/abci/server/grpc_server.go @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (* func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) { return app.Application.Commit(ctx) } + +func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + resV2, err := app.Application.CheckTx(ctx, req) + if err != nil { + return &types.ResponseCheckTx{}, err + } + return resV2.ResponseCheckTx, nil +} diff --git a/abci/types/application.go b/abci/types/application.go index 9915d16a1..5f98bdc9f 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -12,7 +12,7 @@ type Application interface { Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state // Mempool Connection - CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool + CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool // Consensus Connection InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo, return &ResponseInfo{}, nil } -func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) { - return &ResponseCheckTx{Code: CodeTypeOK}, nil +func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) { + return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil } func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) { diff --git a/abci/types/messages.go b/abci/types/messages.go index 2ab6ac1df..f6a6c12d1 100644 --- a/abci/types/messages.go +++ b/abci/types/messages.go @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response { } } -func ToResponseCheckTx(res *ResponseCheckTx) *Response { +func ToResponseCheckTx(res *ResponseCheckTxV2) *Response { return &Response{ - Value: &Response_CheckTx{res}, + Value: &Response_CheckTx{res.ResponseCheckTx}, } } diff --git a/abci/types/mocks/application.go b/abci/types/mocks/application.go index c6eb29219..08ef9ca5c 100644 --- a/abci/types/mocks/application.go +++ b/abci/types/mocks/application.go @@ -38,15 +38,15 @@ func (_m *Application) ApplySnapshotChunk(_a0 context.Context, _a1 *types.Reques } // CheckTx provides a mock function with given fields: _a0, _a1 -func (_m *Application) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (_m *Application) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { ret := _m.Called(_a0, _a1) - var r0 *types.ResponseCheckTx - if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTx); ok { + var r0 *types.ResponseCheckTxV2 + if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTxV2); ok { r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.ResponseCheckTx) + r0 = ret.Get(0).(*types.ResponseCheckTxV2) } } diff --git a/abci/types/types.go b/abci/types/types.go index 121e72159..4df6d4751 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -237,3 +237,21 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) { } return s, nil } + +type PendingTxCheckerResponse int + +const ( + Accepted PendingTxCheckerResponse = iota + Rejected + Pending +) + +type PendingTxChecker func() PendingTxCheckerResponse + +// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able +// to utilize the new fields in V2 type (but still be backwards-compatible) +type ResponseCheckTxV2 struct { + *ResponseCheckTx + IsPendingTransaction bool + Checker PendingTxChecker // must not be nil if IsPendingTransaction is true +} diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index e634d4b22..84badbeeb 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques return res, nil } -func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { app.mu.Lock() defer app.mu.Unlock() txValue := txAsUint64(req.Tx) if txValue != uint64(app.mempoolTxCount) { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Code: code.CodeTypeBadNonce, - }, nil + }}, nil } app.mempoolTxCount++ - return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil } func txAsUint64(tx []byte) uint64 { diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b57603b50..d2b84910c 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -81,6 +81,10 @@ type TxMempool struct { // index. i.e. older transactions are first. timestampIndex *WrappedTxList + // pendingTxs stores transactions that are not valid yet but might become valid + // if its checker returns Accepted + pendingTxs *PendingTxs + // A read/write lock is used to safe guard updates, insertions and deletions // from the mempool. A read-lock is implicitly acquired when executing CheckTx, // however, a caller must explicitly grab a write-lock via Lock when updating @@ -120,6 +124,7 @@ func NewTxMempool( timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) }), + pendingTxs: NewPendingTxs(), failedCheckTxCounts: map[types.NodeID]uint64{}, peerManager: peerManager, } @@ -286,17 +291,25 @@ func (txmp *TxMempool) CheckTx( height: txmp.height, } - // only add new transaction if checkTx passes if err == nil { - err = txmp.addNewTransaction(wtx, res, txInfo) + // only add new transaction if checkTx passes and is not pending + if !res.IsPendingTransaction { + err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo) - if err != nil { - return err + if err != nil { + return err + } + } else { + // otherwise add to pending txs store + if res.Checker == nil { + return errors.New("no checker available for pending transaction") + } + txmp.pendingTxs.Insert(wtx, res, txInfo) } } if cb != nil { - cb(res) + cb(res.ResponseCheckTx) } return nil @@ -470,6 +483,7 @@ func (txmp *TxMempool) Update( } } + txmp.handlePendingTransactions() txmp.purgeExpiredTxs(blockHeight) // If there any uncommitted transactions left in the mempool, we either @@ -633,7 +647,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck // // This method is NOT executed for the initial CheckTx on a new transaction; // that case is handled by addNewTransaction instead. -func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) { +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) { if txmp.recheckCursor == nil { return } @@ -676,10 +690,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT if !txmp.txStore.IsTxRemoved(wtx.hash) { var err error if txmp.postCheck != nil { - err = txmp.postCheck(tx, res) + err = txmp.postCheck(tx, res.ResponseCheckTx) } - if res.Code == abci.CodeTypeOK && err == nil { + // we will treat a transaction that turns pending in a recheck as invalid and evict it + if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction { wtx.priority = res.Priority } else { txmp.logger.Debug( @@ -904,3 +919,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string jsonData, _ := json.Marshal(logs) return string(jsonData) } + +func (txmp *TxMempool) handlePendingTransactions() { + accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions() + for _, tx := range accepted { + if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil { + txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err)) + } + } + if !txmp.config.KeepInvalidTxsInCache { + for _, tx := range rejected { + txmp.cache.Remove(tx.tx.tx) + } + } +} diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index d86ba8f7e..336357aa2 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -37,7 +37,7 @@ type testTx struct { priority int64 } -func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { var ( priority int64 sender string @@ -48,29 +48,29 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a if len(parts) == 3 { v, err := strconv.ParseInt(string(parts[2]), 10, 64) if err != nil { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Code: 100, GasWanted: 1, - }, nil + }}, nil } priority = v sender = string(parts[0]) } else { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Code: 101, GasWanted: 1, - }, nil + }}, nil } - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Sender: sender, Code: code.CodeTypeOK, GasWanted: 1, - }, nil + }}, nil } func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool { @@ -727,4 +727,4 @@ func TestAppendCheckTxErr(t *testing.T) { require.NoError(t, err) require.Equal(t, len(data), 1) require.Equal(t, data[0]["log"], "sample error msg") -} \ No newline at end of file +} diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c7113c951..9570935eb 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -5,6 +5,7 @@ import ( "sync" "time" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -72,9 +73,9 @@ func (wtx *WrappedTx) Size() int { // TxStore implements a thread-safe mapping of valid transaction(s). // // NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. +// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative +// access is not allowed. Regardless, it is not expected for the mempool to +// need mutative access. type TxStore struct { mtx sync.RWMutex hashTxs map[types.TxKey]*WrappedTx // primary index @@ -291,3 +292,67 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { i++ } } + +type PendingTxs struct { + mtx *sync.Mutex + txs []PendingTxInfo +} + +type PendingTxInfo struct { + tx *WrappedTx + checkTxResponse *abci.ResponseCheckTxV2 + txInfo TxInfo +} + +func NewPendingTxs() *PendingTxs { + return &PendingTxs{ + mtx: &sync.Mutex{}, + txs: []PendingTxInfo{}, + } +} + +func (p *PendingTxs) EvaluatePendingTransactions() ( + acceptedTxs []PendingTxInfo, + rejectedTxs []PendingTxInfo, +) { + poppedIndices := []int{} + p.mtx.Lock() + defer p.mtx.Unlock() + for i := 0; i < len(p.txs); i++ { + switch p.txs[i].checkTxResponse.Checker() { + case abci.Accepted: + acceptedTxs = append(acceptedTxs, p.txs[i]) + poppedIndices = append(poppedIndices, i) + case abci.Rejected: + rejectedTxs = append(rejectedTxs, p.txs[i]) + poppedIndices = append(poppedIndices, i) + } + } + p.popTxsAtIndices(poppedIndices) + return +} + +// assume mtx is already acquired +func (p *PendingTxs) popTxsAtIndices(indices []int) { + if len(indices) == 0 { + return + } + newTxs := []PendingTxInfo{} + start := 0 + for _, idx := range indices { + newTxs = append(newTxs, p.txs[start:idx]...) + start = idx + } + newTxs = append(newTxs, p.txs[indices[len(indices)-1]:]...) + p.txs = newTxs +} + +func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.txs = append(p.txs, PendingTxInfo{ + tx: tx, + checkTxResponse: resCheckTx, + txInfo: txInfo, + }) +} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index fa1d8c66e..f3ab28f5c 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -169,7 +169,7 @@ func (app *proxyClient) Flush(ctx context.Context) error { return app.client.Flush(ctx) } -func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() return app.client.CheckTx(ctx, req) } diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 3d6bc5673..1e8628641 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -184,7 +184,7 @@ func (env *Environment) CheckTx(ctx context.Context, req *coretypes.RequestCheck if err != nil { return nil, err } - return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil + return &coretypes.ResultCheckTx{ResponseCheckTx: *res.ResponseCheckTx}, nil } func (env *Environment) RemoveTx(ctx context.Context, req *coretypes.RequestRemoveTx) error { diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index 4284bad68..1d0d59441 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -302,8 +302,8 @@ func (app *testApp) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBl }, nil } -func (app *testApp) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { - return &abci.ResponseCheckTx{}, nil +func (app *testApp) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{}}, nil } func (app *testApp) Commit(context.Context) (*abci.ResponseCommit, error) { diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index 142d64d19..27d779554 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -60,7 +60,7 @@ func (a ABCIApp) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes return nil, err } - res := &coretypes.ResultBroadcastTxCommit{CheckTx: *resp} + res := &coretypes.ResultBroadcastTxCommit{CheckTx: *resp.ResponseCheckTx} if res.CheckTx.IsErr() { return res, nil } diff --git a/test/e2e/app/app.go b/test/e2e/app/app.go index 885b49f39..b127fa744 100644 --- a/test/e2e/app/app.go +++ b/test/e2e/app/app.go @@ -163,14 +163,14 @@ func (app *Application) InitChain(_ context.Context, req *abci.RequestInitChain) } // CheckTx implements ABCI. -func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { app.mu.Lock() defer app.mu.Unlock() _, _, err := parseTx(req.Tx) if err != nil { - return &abci.ResponseCheckTx{ - Code: code.CodeTypeEncodingError, + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeEncodingError}, }, nil } @@ -178,7 +178,9 @@ func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a time.Sleep(time.Duration(app.cfg.CheckTxDelayMS) * time.Millisecond) } - return &abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, + }, nil } // FinalizeBlock implements ABCI. From a42a7ca7b8d98f11fcd16f1b590f4284f37c2fe8 Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 20 Dec 2023 21:43:55 +0800 Subject: [PATCH 04/16] fix unconfirmed tx to consider pending txs (#172) --- internal/mempool/mempool.go | 9 ++++++++- internal/mempool/tx.go | 20 ++++++++++++++++++-- internal/rpc/core/mempool.go | 2 ++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index d2b84910c..d38fe9e5c 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -177,7 +177,7 @@ func (txmp *TxMempool) Unlock() { // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() + return txmp.txStore.Size() + txmp.pendingTxs.Size() } // SizeBytes return the total sum in bytes of all the valid transactions in the @@ -437,6 +437,13 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { for _, wtx := range wTxs { txs = append(txs, wtx.tx) } + if len(txs) < max { + // retrieve more from pending txs + pending := txmp.pendingTxs.Peek(max - len(txs)) + for _, ptx := range pending { + txs = append(txs, ptx.tx.tx) + } + } return txs } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 9570935eb..e5fd7b16f 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -294,7 +294,7 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { } type PendingTxs struct { - mtx *sync.Mutex + mtx *sync.RWMutex txs []PendingTxInfo } @@ -306,7 +306,7 @@ type PendingTxInfo struct { func NewPendingTxs() *PendingTxs { return &PendingTxs{ - mtx: &sync.Mutex{}, + mtx: &sync.RWMutex{}, txs: []PendingTxInfo{}, } } @@ -356,3 +356,19 @@ func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, t txInfo: txInfo, }) } + +func (p *PendingTxs) Peek(max int) []PendingTxInfo { + p.mtx.RLock() + defer p.mtx.RUnlock() + // priority is fifo + if max > len(p.txs) { + return p.txs + } + return p.txs[:max] +} + +func (p *PendingTxs) Size() int { + p.mtx.RLock() + defer p.mtx.RUnlock() + return len(p.txs) +} diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 1e8628641..b5a1592d9 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -148,6 +148,7 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) { totalCount := env.Mempool.Size() + fmt.Printf("EVMTEST mempool size: %d\n", totalCount) perPage := env.validatePerPage(req.PerPage.IntPtr()) page, err := validatePage(req.Page.IntPtr(), perPage, totalCount) if err != nil { @@ -159,6 +160,7 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) result := txs[skipCount:] + fmt.Printf("EVMTEST unconfirmed tx result count: %d, skipping %d\n", len(result), skipCount) return &coretypes.ResultUnconfirmedTxs{ Count: len(result), Total: totalCount, From 8177f9854d6c2e61f9658dad79be920955e84aa5 Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 20 Dec 2023 22:25:47 +0800 Subject: [PATCH 05/16] fix pending pop (#173) --- internal/mempool/mempool.go | 4 +++- internal/mempool/tx.go | 2 +- internal/rpc/core/mempool.go | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index d38fe9e5c..41f5b587c 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -177,7 +177,9 @@ func (txmp *TxMempool) Unlock() { // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() + txmp.pendingTxs.Size() + txSize := txmp.txStore.Size() + pendingSize := txmp.pendingTxs.Size() + return txSize + pendingSize } // SizeBytes return the total sum in bytes of all the valid transactions in the diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index e5fd7b16f..c74872054 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -343,7 +343,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { newTxs = append(newTxs, p.txs[start:idx]...) start = idx } - newTxs = append(newTxs, p.txs[indices[len(indices)-1]:]...) + newTxs = append(newTxs, p.txs[start+1:]...) p.txs = newTxs } diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index b5a1592d9..1e8628641 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -148,7 +148,6 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) { totalCount := env.Mempool.Size() - fmt.Printf("EVMTEST mempool size: %d\n", totalCount) perPage := env.validatePerPage(req.PerPage.IntPtr()) page, err := validatePage(req.Page.IntPtr(), perPage, totalCount) if err != nil { @@ -160,7 +159,6 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) result := txs[skipCount:] - fmt.Printf("EVMTEST unconfirmed tx result count: %d, skipping %d\n", len(result), skipCount) return &coretypes.ResultUnconfirmedTxs{ Count: len(result), Total: totalCount, From 524631508b24c281eb38cecbed3ef9c4c17c9744 Mon Sep 17 00:00:00 2001 From: codchen Date: Tue, 2 Jan 2024 23:45:44 +0800 Subject: [PATCH 06/16] add TTL for pending txs (#174) --- internal/mempool/mempool.go | 6 ++++-- internal/mempool/tx.go | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 41f5b587c..56d0cb6eb 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -492,8 +492,8 @@ func (txmp *TxMempool) Update( } } - txmp.handlePendingTransactions() txmp.purgeExpiredTxs(blockHeight) + txmp.handlePendingTransactions() // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining @@ -844,7 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { // purgeExpiredTxs removes all transactions that have exceeded their respective // height- and/or time-based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool, but preserved in the cache. +// transaction will be removed from the mempool, but preserved in the cache (except for pending txs). // // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which // the caller has a write-lock on the mempool and so we can safely iterate over @@ -890,6 +890,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { for _, wtx := range expiredTxs { txmp.removeTx(wtx, false) } + + txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove) } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c74872054..2f3ec7b8f 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -372,3 +372,41 @@ func (p *PendingTxs) Size() int { defer p.mtx.RUnlock() return len(p.txs) } + +func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if len(p.txs) == 0 { + return + } + + // txs retains the ordering of insertion + if ttlNumBlock > 0 { + idxFirstNotExpiredTx := len(p.txs) + for i, ptx := range p.txs { + if (blockHeight - ptx.tx.height) <= ttlNumBlock { + idxFirstNotExpiredTx = i + } else { + cb(ptx.tx.tx) + } + } + p.txs = p.txs[idxFirstNotExpiredTx:] + } + + if len(p.txs) == 0 { + return + } + + if ttlDuration > 0 { + idxFirstNotExpiredTx := len(p.txs) + for i, ptx := range p.txs { + if now.Sub(ptx.tx.timestamp) <= ttlDuration { + idxFirstNotExpiredTx = i + } else { + cb(ptx.tx.tx) + } + } + p.txs = p.txs[idxFirstNotExpiredTx:] + } +} From 9f8d38065e5ff733affa4cb01ac98e5217959432 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 4 Jan 2024 09:22:26 -0500 Subject: [PATCH 07/16] [EVM] Fix evm pending nonce (#179) * Perf: Increase buffer size for pubsub server to boost performance (#167) * Increase buffer size for pubsub server * Add more timeout for test failure * Add more timeout * Fix test split scripts * Fix test split * Fix unit test * Unit test * Unit test * [P2P] Optimize block pool requester retry and peer pick up logic (#170) * P2P Improvements: Fix block sync reactor and block pool retry logic * Revert "Add event data to result event (#165)" (#176) This reverts commit 72bb29caf821ed03900bcd74f4ddb439df85f5f1. * Fix block sync auto restart not working as expected (#175) * Fix edge case for blocksync (#178) * fix evm pending nonce * fix test * deflake a test * de-flake test * Revert "merge main" This reverts commit 58b94248ac561583b918775c0b0e220d4225fbc0, reversing changes made to 02d14782edc6c58d59db2658a4190dbdc5c7280f. * consider keep-in-cache logic when removing from cache * undo test tweaks --------- Co-authored-by: Yiming Zang <50607998+yzang2019@users.noreply.github.com> Co-authored-by: Jeremy Wei --- abci/types/types.go | 4 +++- internal/mempool/mempool.go | 20 ++++++++++++++------ internal/mempool/tx.go | 30 ++++++++++++++++++------------ 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/abci/types/types.go b/abci/types/types.go index 4df6d4751..b40865524 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -247,11 +247,13 @@ const ( ) type PendingTxChecker func() PendingTxCheckerResponse +type ExpireTxHandler func() -// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able +// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able // to utilize the new fields in V2 type (but still be backwards-compatible) type ResponseCheckTxV2 struct { *ResponseCheckTx IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true + ExpireTxHandler ExpireTxHandler } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 56d0cb6eb..084313cec 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -291,13 +291,20 @@ func (txmp *TxMempool) CheckTx( hash: txHash, timestamp: time.Now().UTC(), height: txmp.height, + expiredCallback: func(removeFromCache bool) { + if removeFromCache { + txmp.cache.Remove(tx) + } + if res.ExpireTxHandler != nil { + res.ExpireTxHandler() + } + }, } if err == nil { // only add new transaction if checkTx passes and is not pending if !res.IsPendingTransaction { err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo) - if err != nil { return err } @@ -837,9 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } + wtx.expiredCallback(removeFromCache) } // purgeExpiredTxs removes all transactions that have exceeded their respective @@ -888,10 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) } - txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove) + // remove pending txs that have expired + txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { + wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache) + }) } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 2f3ec7b8f..4d5762be5 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -64,6 +64,9 @@ type WrappedTx struct { // transaction in the mempool can be evicted when it is simultaneously having // a reCheckTx callback executed. removed bool + + // this is the callback that can be called when a transaction expires + expiredCallback func(removeFromCache bool) } func (wtx *WrappedTx) Size() int { @@ -295,10 +298,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { type PendingTxs struct { mtx *sync.RWMutex - txs []PendingTxInfo + txs []TxWithResponse } -type PendingTxInfo struct { +type TxWithResponse struct { tx *WrappedTx checkTxResponse *abci.ResponseCheckTxV2 txInfo TxInfo @@ -307,13 +310,12 @@ type PendingTxInfo struct { func NewPendingTxs() *PendingTxs { return &PendingTxs{ mtx: &sync.RWMutex{}, - txs: []PendingTxInfo{}, + txs: []TxWithResponse{}, } } - func (p *PendingTxs) EvaluatePendingTransactions() ( - acceptedTxs []PendingTxInfo, - rejectedTxs []PendingTxInfo, + acceptedTxs []TxWithResponse, + rejectedTxs []TxWithResponse, ) { poppedIndices := []int{} p.mtx.Lock() @@ -337,7 +339,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { if len(indices) == 0 { return } - newTxs := []PendingTxInfo{} + newTxs := []TxWithResponse{} start := 0 for _, idx := range indices { newTxs = append(newTxs, p.txs[start:idx]...) @@ -350,14 +352,14 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { p.mtx.Lock() defer p.mtx.Unlock() - p.txs = append(p.txs, PendingTxInfo{ + p.txs = append(p.txs, TxWithResponse{ tx: tx, checkTxResponse: resCheckTx, txInfo: txInfo, }) } -func (p *PendingTxs) Peek(max int) []PendingTxInfo { +func (p *PendingTxs) Peek(max int) []TxWithResponse { p.mtx.RLock() defer p.mtx.RUnlock() // priority is fifo @@ -373,7 +375,7 @@ func (p *PendingTxs) Size() int { return len(p.txs) } -func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) { +func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) { p.mtx.Lock() defer p.mtx.Unlock() @@ -385,10 +387,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlNumBlock > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if (blockHeight - ptx.tx.height) <= ttlNumBlock { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:] @@ -401,10 +405,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat if ttlDuration > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { + // once found, we can break because these are ordered if now.Sub(ptx.tx.timestamp) <= ttlDuration { idxFirstNotExpiredTx = i + break } else { - cb(ptx.tx.tx) + cb(ptx.tx) } } p.txs = p.txs[idxFirstNotExpiredTx:] From b3ed777328bf28341ced704231f5b703774442d5 Mon Sep 17 00:00:00 2001 From: codchen Date: Mon, 15 Jan 2024 21:11:47 +0800 Subject: [PATCH 08/16] Fix bug when popping pending TXs (#188) --- internal/mempool/tx.go | 10 ++++-- internal/mempool/tx_test.go | 72 +++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 4d5762be5..be1d9290f 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -342,10 +342,16 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { newTxs := []TxWithResponse{} start := 0 for _, idx := range indices { + if idx <= start-1 { + panic("indices popped from pending tx store should be sorted without duplicate") + } + if idx >= len(p.txs) { + panic("indices popped from pending tx store out of range") + } newTxs = append(newTxs, p.txs[start:idx]...) - start = idx + start = idx + 1 } - newTxs = append(newTxs, p.txs[start+1:]...) + newTxs = append(newTxs, p.txs[start:]...) p.txs = newTxs } diff --git a/internal/mempool/tx_test.go b/internal/mempool/tx_test.go index c6d494b04..77a49c276 100644 --- a/internal/mempool/tx_test.go +++ b/internal/mempool/tx_test.go @@ -229,3 +229,75 @@ func TestWrappedTxList_Remove(t *testing.T) { sort.Ints(expected) require.Equal(t, expected, got) } + +func TestPendingTxsPopTxsGood(t *testing.T) { + pendingTxs := NewPendingTxs() + for _, test := range []struct { + origLen int + popIndices []int + expected []int + }{ + { + origLen: 1, + popIndices: []int{}, + expected: []int{0}, + }, { + origLen: 1, + popIndices: []int{0}, + expected: []int{}, + }, { + origLen: 2, + popIndices: []int{0}, + expected: []int{1}, + }, { + origLen: 2, + popIndices: []int{1}, + expected: []int{0}, + }, { + origLen: 2, + popIndices: []int{0, 1}, + expected: []int{}, + }, { + origLen: 3, + popIndices: []int{1}, + expected: []int{0, 2}, + }, { + origLen: 3, + popIndices: []int{0, 2}, + expected: []int{1}, + }, { + origLen: 3, + popIndices: []int{0, 1, 2}, + expected: []int{}, + }, { + origLen: 5, + popIndices: []int{0, 1, 4}, + expected: []int{2, 3}, + }, { + origLen: 5, + popIndices: []int{1, 3}, + expected: []int{0, 2, 4}, + }, + } { + pendingTxs.txs = []TxWithResponse{} + for i := 0; i < test.origLen; i++ { + pendingTxs.txs = append(pendingTxs.txs, TxWithResponse{txInfo: TxInfo{SenderID: uint16(i)}}) + } + pendingTxs.popTxsAtIndices(test.popIndices) + require.Equal(t, len(test.expected), len(pendingTxs.txs)) + for i, e := range test.expected { + require.Equal(t, e, int(pendingTxs.txs[i].txInfo.SenderID)) + } + } +} + +func TestPendingTxsPopTxsBad(t *testing.T) { + pendingTxs := NewPendingTxs() + // out of range + require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{0}) }) + // out of order + pendingTxs.txs = []TxWithResponse{{}, {}, {}} + require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{1, 0}) }) + // duplicate + require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{2, 2}) }) +} From 00db6dc26c709b0b869050f8f28de139df616b47 Mon Sep 17 00:00:00 2001 From: Yiming Zang <50607998+yzang2019@users.noreply.github.com> Date: Wed, 17 Jan 2024 09:42:08 -0800 Subject: [PATCH 09/16] Add mempool metrics for number of pending tx and expired txs (#189) * Add metrics for mempool pending transaction size * Add expired tx count metrics --- internal/mempool/mempool.go | 9 +++++++++ internal/mempool/mempool_test.go | 1 + internal/mempool/metrics.gen.go | 14 ++++++++++++++ internal/mempool/metrics.go | 9 +++++++++ 4 files changed, 33 insertions(+) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 084313cec..b76e1f007 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -182,6 +182,11 @@ func (txmp *TxMempool) Size() int { return txSize + pendingSize } +// PendingSize returns the number of pending transactions in the mempool. +func (txmp *TxMempool) PendingSize() int { + return txmp.pendingTxs.Size() +} + // SizeBytes return the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. func (txmp *TxMempool) SizeBytes() int64 { @@ -292,6 +297,7 @@ func (txmp *TxMempool) CheckTx( timestamp: time.Now().UTC(), height: txmp.height, expiredCallback: func(removeFromCache bool) { + txmp.metrics.ExpiredTxs.Add(1) if removeFromCache { txmp.cache.Remove(tx) } @@ -519,6 +525,7 @@ func (txmp *TxMempool) Update( } txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) return nil } @@ -638,6 +645,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) txmp.insertTx(wtx) txmp.logger.Debug( @@ -745,6 +753,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT } txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) } // updateReCheckTxs updates the recheck cursors using the gossipIndex. For diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 336357aa2..d90a12277 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -215,6 +215,7 @@ func TestTxMempool_Size(t *testing.T) { txmp := setup(t, client, 0) txs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, 0, txmp.PendingSize()) require.Equal(t, int64(5690), txmp.SizeBytes()) rawTxs := make([]types.Tx, len(txs)) diff --git a/internal/mempool/metrics.gen.go b/internal/mempool/metrics.gen.go index 100c5e71c..c5a44f029 100644 --- a/internal/mempool/metrics.gen.go +++ b/internal/mempool/metrics.gen.go @@ -20,6 +20,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "size", Help: "Number of uncommitted transactions in the mempool.", }, labels).With(labelsAndValues...), + PendingSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "pending_size", + Help: "Number of pending transactions in mempool", + }, labels).With(labelsAndValues...), TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -46,6 +52,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "evicted_txs", Help: "Number of evicted transactions.", }, labels).With(labelsAndValues...), + ExpiredTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "expired_txs", + Help: "Number of expired transactions.", + }, labels).With(labelsAndValues...), RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -58,10 +70,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ Size: discard.NewGauge(), + PendingSize: discard.NewGauge(), TxSizeBytes: discard.NewHistogram(), FailedTxs: discard.NewCounter(), RejectedTxs: discard.NewCounter(), EvictedTxs: discard.NewCounter(), + ExpiredTxs: discard.NewCounter(), RecheckTimes: discard.NewCounter(), } } diff --git a/internal/mempool/metrics.go b/internal/mempool/metrics.go index 532307635..eb296182d 100644 --- a/internal/mempool/metrics.go +++ b/internal/mempool/metrics.go @@ -18,6 +18,9 @@ type Metrics struct { // Number of uncommitted transactions in the mempool. Size metrics.Gauge + // Number of pending transactions in mempool + PendingSize metrics.Gauge + // Histogram of transaction sizes in bytes. TxSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1,3,7"` @@ -38,6 +41,12 @@ type Metrics struct { //metrics:Number of evicted transactions. EvictedTxs metrics.Counter + // ExpiredTxs defines the number of expired transactions. These are valid + // transactions that passed CheckTx and existed in the mempool but were not + // get picked up in time and eventually got expired and removed from mempool + //metrics:Number of expired transactions. + ExpiredTxs metrics.Counter + // Number of times transactions are rechecked in the mempool. RecheckTimes metrics.Counter } From 2d39aad86e324fd8f22bc660e3b5d290bd88410b Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Fri, 19 Jan 2024 10:24:27 -0500 Subject: [PATCH 10/16] [EVM] Allow multiple txs from same account in a block (#190) * add mempool prioritization with evm nonce * fix priority stability * index fixes * replace with binary search insert * impl binary search --- abci/types/types.go | 5 + internal/mempool/mempool.go | 11 +- internal/mempool/mempool_test.go | 94 +++++++++++++++- internal/mempool/priority_queue.go | 138 ++++++++++++++++++++---- internal/mempool/priority_queue_test.go | 122 +++++++++++++++++++++ internal/mempool/tx.go | 5 + 6 files changed, 352 insertions(+), 23 deletions(-) diff --git a/abci/types/types.go b/abci/types/types.go index b40865524..1ecb0a9f3 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -256,4 +256,9 @@ type ResponseCheckTxV2 struct { IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true ExpireTxHandler ExpireTxHandler + + // helper properties for prioritization in mempool + EVMNonce uint64 + EVMSenderAddress string + IsEVM bool } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b76e1f007..7bccb5479 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -292,10 +292,13 @@ func (txmp *TxMempool) CheckTx( } wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, + tx: tx, + hash: txHash, + timestamp: time.Now().UTC(), + height: txmp.height, + evmNonce: res.EVMNonce, + evmAddress: res.EVMSenderAddress, + isEVM: res.IsEVM, expiredCallback: func(removeFromCache bool) { txmp.metrics.ExpiredTxs.Add(1) if removeFromCache { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index d90a12277..dd4874417 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -43,6 +43,42 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a sender string ) + if strings.HasPrefix(string(req.Tx), "evm") { + // format is evm-sender-0=account=priority=nonce + // split into respective vars + parts := bytes.Split(req.Tx, []byte("=")) + sender = string(parts[0]) + account := string(parts[1]) + v, err := strconv.ParseInt(string(parts[2]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 100, + GasWanted: 1, + }}, nil + } + nonce, err := strconv.ParseInt(string(parts[3]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 101, + GasWanted: 1, + }}, nil + } + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: v, + Code: code.CodeTypeOK, + GasWanted: 1, + }, + EVMNonce: uint64(nonce), + EVMSenderAddress: account, + IsEVM: true, + }, nil + } + // infer the priority from the raw transaction value (sender=key=value) parts := bytes.Split(req.Tx, []byte("=")) if len(parts) == 3 { @@ -64,7 +100,6 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a GasWanted: 1, }}, nil } - return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Sender: sender, @@ -412,6 +447,63 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0})) } +func TestTxMempool_Prioritization(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 100) + peerID := uint16(1) + + address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + + // Generate transactions with different priorities + // there are two formats to comply with the above mocked CheckTX + // EVM: evm-sender=account=priority=nonce + // Non-EVM: sender=peer=priority + txs := [][]byte{ + []byte(fmt.Sprintf("sender-0-1=peer=%d", 9)), + []byte(fmt.Sprintf("sender-1-1=peer=%d", 8)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)), + []byte(fmt.Sprintf("sender-2-1=peer=%d", 5)), + []byte(fmt.Sprintf("sender-3-1=peer=%d", 4)), + } + + // copy the slice of txs and shuffle the order randomly + txsCopy := make([][]byte, len(txs)) + copy(txsCopy, txs) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + rng.Shuffle(len(txsCopy), func(i, j int) { + txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i] + }) + + for i := range txsCopy { + require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID})) + } + + // Reap the transactions + reapedTxs := txmp.ReapMaxTxs(len(txs)) + // Check if the reaped transactions are in the correct order of their priorities + for _, tx := range txs { + fmt.Printf("expected: %s\n", string(tx)) + } + fmt.Println("**************") + for _, reapedTx := range reapedTxs { + fmt.Printf("received: %s\n", string(reapedTx)) + } + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i], []byte(reapedTx)) + } +} + func TestTxMempool_CheckTxSamePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index ad3a347a3..3886da0cf 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -12,13 +12,40 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx + mtx sync.RWMutex + txs []*WrappedTx + evmQueue map[string][]*WrappedTx +} + +func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { + // Using BinarySearch to find the appropriate index to insert tx + i := binarySearch(queue, tx) + + // Make room for new value and add it + queue = append(queue, nil) + copy(queue[i+1:], queue[i:]) + queue[i] = tx + return queue +} + +// binarySearch finds the index at which tx should be inserted in queue +func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { + low, high := 0, len(queue) + for low < high { + mid := low + (high-low)/2 + if queue[mid].evmNonce < tx.evmNonce { + low = mid + 1 + } else { + high = mid + } + } + return low } func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), + txs: make([]*WrappedTx, 0), + evmQueue: make(map[string][]*WrappedTx), } heap.Init(pq) @@ -68,13 +95,46 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6 return nil } +// requires read lock +func (pq *TxPriorityQueue) numQueuedUnsafe() int { + var result int + for _, queue := range pq.evmQueue { + result += len(queue) + } + // first items in queue are also in heap, subtract one + return result - len(pq.evmQueue) +} + // NumTxs returns the number of transactions in the priority queue. It is // thread safe. func (pq *TxPriorityQueue) NumTxs() int { pq.mtx.RLock() defer pq.mtx.RUnlock() - return len(pq.txs) + return len(pq.txs) + pq.numQueuedUnsafe() +} + +func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { + if queue, ok := pq.evmQueue[tx.evmAddress]; ok { + for i, t := range queue { + if t.evmNonce == tx.evmNonce { + pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) + if len(pq.evmQueue[tx.evmAddress]) == 0 { + delete(pq.evmQueue, tx.evmAddress) + } + break + } + } + } +} + +func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { + for i, t := range pq.txs { + if t == tx { + return i, true + } + } + return 0, false } // RemoveTx removes a specific transaction from the priority queue. @@ -82,30 +142,71 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) + if idx, ok := pq.findTxIndexUnsafe(tx); ok { + heap.Remove(pq, idx) + } + + if tx.isEVM { + pq.removeQueuedEvmTxUnsafe(tx) } } +func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { + if !tx.isEVM { + heap.Push(pq, tx) + return + } + + queue, exists := pq.evmQueue[tx.evmAddress] + if !exists { + pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx} + heap.Push(pq, tx) + return + } + + first := queue[0] + if tx.evmNonce < first.evmNonce { + if idx, ok := pq.findTxIndexUnsafe(first); ok { + heap.Remove(pq, idx) + } + heap.Push(pq, tx) + } + + pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx) +} + // PushTx adds a valid transaction to the priority queue. It is thread safe. func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() + pq.pushTxUnsafe(tx) +} + +func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { + x := heap.Pop(pq) + if x == nil { + return nil + } - heap.Push(pq, tx) + tx := x.(*WrappedTx) + + if !tx.isEVM { + return tx + } + + pq.removeQueuedEvmTxUnsafe(tx) + if len(pq.evmQueue[tx.evmAddress]) > 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } + + return tx } // PopTx removes the top priority transaction from the queue. It is thread safe. func (pq *TxPriorityQueue) PopTx() *WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil + return pq.popTxUnsafe() } // dequeue up to `max` transactions and reenqueue while locked @@ -113,7 +214,7 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() - numTxs := len(pq.txs) + numTxs := len(pq.txs) + pq.numQueuedUnsafe() if max < 0 { max = numTxs } @@ -121,15 +222,16 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { cap := tmmath.MinInt(numTxs, max) res := make([]*WrappedTx, 0, cap) for i := 0; i < cap; i++ { - popped := heap.Pop(pq) + popped := pq.popTxUnsafe() if popped == nil { break } - res = append(res, popped.(*WrappedTx)) + + res = append(res, popped) } for _, tx := range res { - heap.Push(pq, tx) + pq.pushTxUnsafe(tx) } return res } diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index ddc84806d..bd7dda126 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -1,6 +1,7 @@ package mempool import ( + "fmt" "math/rand" "sort" "sync" @@ -10,6 +11,127 @@ import ( "github.com/stretchr/testify/require" ) +// TxTestCase represents a single test case for the TxPriorityQueue +type TxTestCase struct { + name string + inputTxs []*WrappedTx // Input transactions + expectedOutput []int64 // Expected order of transaction IDs +} + +func TestTxPriorityQueue_ReapHalf(t *testing.T) { + pq := NewTxPriorityQueue() + + // Generate transactions with different priorities and nonces + txs := make([]*WrappedTx, 100) + for i := range txs { + txs[i] = &WrappedTx{ + tx: []byte(fmt.Sprintf("tx-%d", i)), + priority: int64(i), + } + + // Push the transaction + pq.PushTx(txs[i]) + } + + //reverse sort txs by priority + sort.Slice(txs, func(i, j int) bool { + return txs[i].priority > txs[j].priority + }) + + // Reap half of the transactions + reapedTxs := pq.PeekTxs(len(txs) / 2) + + // Check if the reaped transactions are in the correct order of their priorities and nonces + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i].priority, reapedTx.priority) + } +} + +func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { + testCases := []TxTestCase{ + { + name: "PriorityWithEVMAndNonEVM", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "2", isEVM: false, priority: 9}, + {sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8}, + {sender: "6", isEVM: false, priority: 6}, + {sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7}, + }, + { + name: "IdenticalPrioritiesAndNoncesDifferentAddresses", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 5}, + {sender: "2", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 5}, + {sender: "3", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{1, 2, 3}, + }, + { + name: "InterleavedEVAndNonEVMTransactions", + inputTxs: []*WrappedTx{ + {sender: "7", isEVM: false, priority: 15}, + {sender: "8", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 20}, + {sender: "9", isEVM: false, priority: 10}, + {sender: "10", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 20}, + }, + expectedOutput: []int64{8, 10, 7, 9}, + }, + { + name: "SameAddressPriorityDifferentNonces", + inputTxs: []*WrappedTx{ + {sender: "11", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 10}, + {sender: "12", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "13", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 10}, + }, + expectedOutput: []int64{12, 13, 11}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pq := NewTxPriorityQueue() + now := time.Now() + + // Add input transactions to the queue and set timestamp to order inserted + for i, tx := range tc.inputTxs { + tx.timestamp = now.Add(time.Duration(i) * time.Second) + pq.PushTx(tx) + } + + results := pq.PeekTxs(len(tc.inputTxs)) + // Validate the order of transactions + require.Len(t, results, len(tc.expectedOutput)) + for i, expectedTxID := range tc.expectedOutput { + tx := results[i] + require.Equal(t, fmt.Sprintf("%d", expectedTxID), tx.sender) + } + }) + } +} + +func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) { + pq := NewTxPriorityQueue() + address := "0x123" + + // Insert transactions with the same address but different nonces and priorities + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15}) + + // Pop transactions and verify they are in the correct order of nonce + tx1 := pq.PopTx() + require.Equal(t, uint64(1), tx1.evmNonce) + tx2 := pq.PopTx() + require.Equal(t, uint64(2), tx2.evmNonce) + tx3 := pq.PopTx() + require.Equal(t, uint64(3), tx3.evmNonce) +} + func TestTxPriorityQueue(t *testing.T) { pq := NewTxPriorityQueue() numTxs := 1000 diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index be1d9290f..411065db8 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -67,6 +67,11 @@ type WrappedTx struct { // this is the callback that can be called when a transaction expires expiredCallback func(removeFromCache bool) + + // evm properties that aid in prioritization + evmAddress string + evmNonce uint64 + isEVM bool } func (wtx *WrappedTx) Size() int { From 0328de1855c2638f8459ce90b1aaf4950b3fb491 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Fri, 19 Jan 2024 14:31:54 -0500 Subject: [PATCH 11/16] fix removeTx to push next queued evm tx (#191) --- internal/mempool/priority_queue.go | 5 ++-- internal/mempool/priority_queue_test.go | 32 +++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 3886da0cf..6906a9717 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -121,6 +121,8 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) + } else { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) } break } @@ -195,9 +197,6 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { } pq.removeQueuedEvmTxUnsafe(tx) - if len(pq.evmQueue[tx.evmAddress]) > 0 { - heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) - } return tx } diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index bd7dda126..c3bc90853 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -90,6 +90,13 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { }, expectedOutput: []int64{12, 13, 11}, }, + { + name: "OneItem", + inputTxs: []*WrappedTx{ + {sender: "14", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + }, + expectedOutput: []int64{14}, + }, } for _, tc := range testCases { @@ -263,6 +270,31 @@ func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) { } } +func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) { + pq := NewTxPriorityQueue() + + tx1 := &WrappedTx{ + priority: 1, + isEVM: true, + evmAddress: "0xabc", + evmNonce: 1, + } + tx2 := &WrappedTx{ + priority: 1, + isEVM: true, + evmAddress: "0xabc", + evmNonce: 2, + } + + pq.PushTx(tx1) + pq.PushTx(tx2) + + pq.RemoveTx(tx1) + + result := pq.PopTx() + require.Equal(t, tx2, result) +} + func TestTxPriorityQueue_RemoveTx(t *testing.T) { pq := NewTxPriorityQueue() rng := rand.New(rand.NewSource(time.Now().UnixNano())) From 05c867862fe0385ee88fdc652649295f674c3362 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 24 Jan 2024 13:51:56 -0500 Subject: [PATCH 12/16] fix expire metric (#193) --- internal/mempool/mempool.go | 9 +++++---- internal/mempool/tx.go | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 7bccb5479..a00883b05 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -299,8 +299,7 @@ func (txmp *TxMempool) CheckTx( evmNonce: res.EVMNonce, evmAddress: res.EVMSenderAddress, isEVM: res.IsEVM, - expiredCallback: func(removeFromCache bool) { - txmp.metrics.ExpiredTxs.Add(1) + removeHandler: func(removeFromCache bool) { if removeFromCache { txmp.cache.Remove(tx) } @@ -856,7 +855,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - wtx.expiredCallback(removeFromCache) + wtx.removeHandler(removeFromCache) } // purgeExpiredTxs removes all transactions that have exceeded their respective @@ -905,12 +904,14 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } for _, wtx := range expiredTxs { + txmp.metrics.ExpiredTxs.Add(1) txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) } // remove pending txs that have expired txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { - wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache) + txmp.metrics.ExpiredTxs.Add(1) + wtx.removeHandler(!txmp.config.KeepInvalidTxsInCache) }) } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 411065db8..025cfda73 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -65,8 +65,8 @@ type WrappedTx struct { // a reCheckTx callback executed. removed bool - // this is the callback that can be called when a transaction expires - expiredCallback func(removeFromCache bool) + // this is the callback that can be called when a transaction is removed + removeHandler func(removeFromCache bool) // evm properties that aid in prioritization evmAddress string From 394a86d3b26c25f5bad96964c68cdc2d494fff96 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 25 Jan 2024 11:36:08 -0500 Subject: [PATCH 13/16] [EVM] Fix duplicate evm txs from priority queue (#195) * debug duplicate evm tx * add more logs * add some \ns * more logs * fix swap check * add-lockable-reap-by-gas * add invariant checks * fix invariant parenthesis * fix log * remove invalid invariant * fix nonce ordering pain * handle ordering of insert * fix remove * cleanup * fix imports * cleanup * avoid getTransactionByHash(hash) panic due to index * use Key() to compare instead of pointer --- internal/mempool/mempool.go | 30 ++----- internal/mempool/priority_queue.go | 106 +++++++++++++++++++++--- internal/mempool/priority_queue_test.go | 34 +++++++- internal/rpc/core/mempool.go | 3 + types/tx.go | 2 +- 5 files changed, 139 insertions(+), 36 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index a00883b05..9bed03a8a 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -399,42 +399,28 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { totalSize int64 ) - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) + var txs []types.Tx if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold { // do not reap anything if threshold is not met return txs } - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) + txmp.priorityIndex.ForEachTx(func(wtx *WrappedTx) bool { size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - // Ensure we have capacity for the transaction with respect to the - // transaction size. if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] + return false } - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas gas := totalGas + wtx.gasWanted if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] + return false } totalGas = gas - } + + txs = append(txs, wtx.tx) + return true + }) return txs } diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 6906a9717..6dbbfe9b2 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -13,14 +13,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { mtx sync.RWMutex - txs []*WrappedTx - evmQueue map[string][]*WrappedTx + txs []*WrappedTx // priority heap + evmQueue map[string][]*WrappedTx // sorted by nonce } -func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { - // Using BinarySearch to find the appropriate index to insert tx - i := binarySearch(queue, tx) - +func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx, i int) []*WrappedTx { // Make room for new value and add it queue = append(queue, nil) copy(queue[i+1:], queue[i:]) @@ -33,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { low, high := 0, len(queue) for low < high { mid := low + (high-low)/2 - if queue[mid].evmNonce < tx.evmNonce { + if queue[mid].evmNonce <= tx.evmNonce { low = mid + 1 } else { high = mid @@ -117,12 +114,15 @@ func (pq *TxPriorityQueue) NumTxs() int { func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { if queue, ok := pq.evmQueue[tx.evmAddress]; ok { for i, t := range queue { - if t.evmNonce == tx.evmNonce { + if t.tx.Key() == tx.tx.Key() { pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) } else { - heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + // only if removing the first item, then push next onto queue + if i == 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } } break } @@ -174,9 +174,67 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { heap.Push(pq, tx) } - pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx) + pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx)) + } +// These are available if we need to test the invariant checks +// these can be used to troubleshoot invariant violations +//func (pq *TxPriorityQueue) checkInvariants(msg string) { +// +// uniqHashes := make(map[string]bool) +// for _, tx := range pq.txs { +// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key())) +// } +// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true +// if tx.isEVM { +// if queue, ok := pq.evmQueue[tx.evmAddress]; ok { +// if queue[0].tx.Key() != tx.tx.Key() { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key())) +// } +// } else { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key())) +// } +// } +// } +// +// // each item in all queues should be unique nonce +// for _, queue := range pq.evmQueue { +// hashes := make(map[string]bool) +// for idx, tx := range queue { +// if idx == 0 { +// _, ok := pq.findTxIndexUnsafe(tx) +// if !ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce)) +// } +// } +// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce)) +// } +// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true +// } +// } +//} + +// for debugging situations where invariant violations occur +//func (pq *TxPriorityQueue) print() { +// for _, tx := range pq.txs { +// fmt.Printf("DEBUG PRINT: heap: nonce=%d, hash=%x\n", tx.evmNonce, tx.tx.Key()) +// } +// +// for _, queue := range pq.evmQueue { +// for idx, tx := range queue { +// fmt.Printf("DEBUG PRINT: evmQueue[%d]: nonce=%d, hash=%x\n", idx, tx.evmNonce, tx.tx.Key()) +// } +// } +//} + // PushTx adds a valid transaction to the priority queue. It is thread safe. func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() @@ -185,6 +243,9 @@ func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { } func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { + if len(pq.txs) == 0 { + return nil + } x := heap.Pop(pq) if x == nil { return nil @@ -209,6 +270,31 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx { } // dequeue up to `max` transactions and reenqueue while locked +func (pq *TxPriorityQueue) ForEachTx(handler func(wtx *WrappedTx) bool) { + pq.mtx.Lock() + defer pq.mtx.Unlock() + + numTxs := len(pq.txs) + pq.numQueuedUnsafe() + + txs := make([]*WrappedTx, 0, numTxs) + + defer func() { + for _, tx := range txs { + pq.pushTxUnsafe(tx) + } + }() + + for i := 0; i < numTxs; i++ { + popped := pq.popTxUnsafe() + txs = append(txs, popped) + if !handler(popped) { + return + } + } +} + +// dequeue up to `max` transactions and reenqueue while locked +// TODO: use ForEachTx instead func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index c3bc90853..c1e17d278 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -49,6 +49,29 @@ func TestTxPriorityQueue_ReapHalf(t *testing.T) { func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { testCases := []TxTestCase{ + { + name: "PriorityWithEVMAndNonEVMDuplicateNonce", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9}, + {sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7}, + }, + expectedOutput: []int64{1, 2, 3}, + }, + { + name: "PriorityWithEVMAndNonEVMDuplicateNonce", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "2", isEVM: false, priority: 9}, + {sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8}, + {sender: "6", isEVM: false, priority: 6}, + {sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7}, + }, { name: "PriorityWithEVMAndNonEVM", inputTxs: []*WrappedTx{ @@ -107,6 +130,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { // Add input transactions to the queue and set timestamp to order inserted for i, tx := range tc.inputTxs { tx.timestamp = now.Add(time.Duration(i) * time.Second) + tx.tx = []byte(fmt.Sprintf("%d", time.Now().UnixNano())) pq.PushTx(tx) } @@ -126,9 +150,9 @@ func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) { address := "0x123" // Insert transactions with the same address but different nonces and priorities - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10}) - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5}) - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10, tx: []byte("tx1")}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5, tx: []byte("tx2")}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15, tx: []byte("tx3")}) // Pop transactions and verify they are in the correct order of nonce tx1 := pq.PopTx() @@ -154,6 +178,7 @@ func TestTxPriorityQueue(t *testing.T) { pq.PushTx(&WrappedTx{ priority: int64(i), timestamp: time.Now(), + tx: []byte(fmt.Sprintf("%d", i)), }) wg.Done() @@ -278,12 +303,14 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) { isEVM: true, evmAddress: "0xabc", evmNonce: 1, + tx: []byte("tx1"), } tx2 := &WrappedTx{ priority: 1, isEVM: true, evmAddress: "0xabc", evmNonce: 2, + tx: []byte("tx2"), } pq.PushTx(tx1) @@ -306,6 +333,7 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) { x := rng.Intn(100000) pq.PushTx(&WrappedTx{ priority: int64(x), + tx: []byte(fmt.Sprintf("%d", i)), }) values[i] = x diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 1e8628641..eaad97b4f 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -157,6 +157,9 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque skipCount := validateSkipCount(page, perPage) txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) + if skipCount > len(txs) { + skipCount = len(txs) + } result := txs[skipCount:] return &coretypes.ResultUnconfirmedTxs{ diff --git a/types/tx.go b/types/tx.go index 0a37d1e9d..5b7c2377e 100644 --- a/types/tx.go +++ b/types/tx.go @@ -184,7 +184,7 @@ func (t TxRecordSet) Validate(maxSizeBytes int64, otxs Txs) error { for i, cur := range allCopy { // allCopy is sorted, so any duplicated data will be adjacent. if i+1 < len(allCopy) && bytes.Equal(cur, allCopy[i+1]) { - return fmt.Errorf("found duplicate transaction with hash: %x", cur.Hash()) + return fmt.Errorf("found duplicate transaction with hash: %x", cur.Key()) } } From f92b843786a63515fd960a42e24edb168dcd3f2c Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 29 Jan 2024 02:29:17 -0500 Subject: [PATCH 14/16] [EVM] prevent duplicate txs from getting inserted (#196) * prevent duplicates in mempool * use timestamp in priority queue --- internal/consensus/mempool_test.go | 5 +- internal/mempool/mempool.go | 32 ++++++--- internal/mempool/priority_queue.go | 90 +++++++++++++++++++------ internal/mempool/priority_queue_test.go | 1 + internal/mempool/tx.go | 6 ++ 5 files changed, 102 insertions(+), 32 deletions(-) diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index 84badbeeb..6ef3849ad 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -139,7 +139,7 @@ func checkTxsRange(ctx context.Context, t *testing.T, cs *State, start, end int) var rCode uint32 err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) { rCode = r.Code }, mempool.TxInfo{}) require.NoError(t, err, "error after checkTx") - require.Equal(t, code.CodeTypeOK, rCode, "checkTx code is error, txBytes %X", txBytes) + require.Equal(t, code.CodeTypeOK, rCode, "checkTx code is error, txBytes %X, index=%d", txBytes, i) } } @@ -166,7 +166,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { require.NoError(t, err) newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader) - const numTxs int64 = 100 + const numTxs int64 = 50 go checkTxsRange(ctx, t, cs, 0, int(numTxs)) startTestRound(ctx, cs, cs.roundState.Height(), cs.roundState.Round()) @@ -331,7 +331,6 @@ func txAsUint64(tx []byte) uint64 { func (app *CounterApplication) Commit(context.Context) (*abci.ResponseCommit, error) { app.mu.Lock() defer app.mu.Unlock() - app.mempoolTxCount = app.txCount return &abci.ResponseCommit{}, nil } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 9bed03a8a..612a0bc48 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -332,6 +332,11 @@ func (txmp *TxMempool) CheckTx( return nil } +func (txmp *TxMempool) isInMempool(tx types.Tx) bool { + existingTx := txmp.txStore.GetTxByHash(tx.Key()) + return existingTx != nil && !existingTx.removed +} + func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { txmp.Lock() defer txmp.Unlock() @@ -635,15 +640,17 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck txmp.metrics.Size.Set(float64(txmp.Size())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) - txmp.insertTx(wtx) - txmp.logger.Debug( - "inserted good transaction", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "height", txmp.height, - "num_txs", txmp.Size(), - ) - txmp.notifyTxsAvailable() + if txmp.insertTx(wtx) { + txmp.logger.Debug( + "inserted good transaction", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "height", txmp.height, + "num_txs", txmp.Size(), + ) + txmp.notifyTxsAvailable() + } + return nil } @@ -809,7 +816,11 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } -func (txmp *TxMempool) insertTx(wtx *WrappedTx) { +func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { + if txmp.isInMempool(wtx.tx) { + return false + } + txmp.txStore.SetTx(wtx) txmp.priorityIndex.PushTx(wtx) txmp.heightIndex.Insert(wtx) @@ -822,6 +833,7 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { wtx.gossipEl = gossipEl atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) + return true } func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 6dbbfe9b2..3de4e810f 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -30,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { low, high := 0, len(queue) for low < high { mid := low + (high-low)/2 - if queue[mid].evmNonce <= tx.evmNonce { + if queue[mid].IsBefore(tx) { low = mid + 1 } else { high = mid @@ -118,11 +118,6 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) - } else { - // only if removing the first item, then push next onto queue - if i == 0 { - heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) - } } break } @@ -132,7 +127,7 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { for i, t := range pq.txs { - if t == tx { + if t.tx.Key() == tx.tx.Key() { return i, true } } @@ -146,9 +141,13 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { if idx, ok := pq.findTxIndexUnsafe(tx); ok { heap.Remove(pq, idx) - } - - if tx.isEVM { + if tx.isEVM { + pq.removeQueuedEvmTxUnsafe(tx) + if len(pq.evmQueue[tx.evmAddress]) > 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } + } + } else if tx.isEVM { pq.removeQueuedEvmTxUnsafe(tx) } } @@ -159,6 +158,7 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { return } + // if there aren't other waiting txs, init and return queue, exists := pq.evmQueue[tx.evmAddress] if !exists { pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx} @@ -166,29 +166,45 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { return } + // this item is on the heap at the moment first := queue[0] - if tx.evmNonce < first.evmNonce { + + // the queue's first item (and ONLY the first item) must be on the heap + // if this tx is before the first item, then we need to remove the first + // item from the heap + if tx.IsBefore(first) { if idx, ok := pq.findTxIndexUnsafe(first); ok { heap.Remove(pq, idx) } heap.Push(pq, tx) } - pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx)) - } // These are available if we need to test the invariant checks // these can be used to troubleshoot invariant violations //func (pq *TxPriorityQueue) checkInvariants(msg string) { -// // uniqHashes := make(map[string]bool) -// for _, tx := range pq.txs { +// for idx, tx := range pq.txs { +// if tx == nil { +// pq.print() +// panic(fmt.Sprintf("DEBUG PRINT: found nil item on heap: idx=%d\n", idx)) +// } +// if tx.tx == nil { +// pq.print() +// panic(fmt.Sprintf("DEBUG PRINT: found nil tx.tx on heap: idx=%d\n", idx)) +// } // if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { // pq.print() // panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key())) // } // uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true +// +// //if _, ok := pq.keys[tx.tx.Key()]; !ok { +// // pq.print() +// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key())) +// //} +// // if tx.isEVM { // if queue, ok := pq.evmQueue[tx.evmAddress]; ok { // if queue[0].tx.Key() != tx.tx.Key() { @@ -213,6 +229,10 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { // panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce)) // } // } +// //if _, ok := pq.keys[tx.tx.Key()]; !ok { +// // pq.print() +// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key())) +// //} // if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { // pq.print() // panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce)) @@ -224,13 +244,31 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { // for debugging situations where invariant violations occur //func (pq *TxPriorityQueue) print() { +// fmt.Println("PRINT PRIORITY QUEUE ****************** ") // for _, tx := range pq.txs { -// fmt.Printf("DEBUG PRINT: heap: nonce=%d, hash=%x\n", tx.evmNonce, tx.tx.Key()) +// if tx == nil { +// fmt.Printf("DEBUG PRINT: heap (nil): nonce=?, hash=?\n") +// continue +// } +// if tx.tx == nil { +// fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, tx.tx is nil \n", tx.evmAddress, tx.evmNonce) +// continue +// } +// fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, hash=%x, time=%d\n", tx.evmAddress, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano()) // } // -// for _, queue := range pq.evmQueue { +// for addr, queue := range pq.evmQueue { // for idx, tx := range queue { -// fmt.Printf("DEBUG PRINT: evmQueue[%d]: nonce=%d, hash=%x\n", idx, tx.evmNonce, tx.tx.Key()) +// if tx == nil { +// fmt.Printf("DEBUG PRINT: found nil item on evmQueue(%s): idx=%d\n", addr, idx) +// continue +// } +// if tx.tx == nil { +// fmt.Printf("DEBUG PRINT: found nil tx.tx on evmQueue(%s): idx=%d\n", addr, idx) +// continue +// } +// +// fmt.Printf("DEBUG PRINT: evmQueue(%s)[%d]: nonce=%d, hash=%x, time=%d\n", tx.evmAddress, idx, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano()) // } // } //} @@ -239,6 +277,7 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() + pq.pushTxUnsafe(tx) } @@ -246,19 +285,31 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { if len(pq.txs) == 0 { return nil } + + // remove the first item from the heap x := heap.Pop(pq) if x == nil { return nil } - tx := x.(*WrappedTx) + // non-evm transactions do not have txs waiting on a nonce if !tx.isEVM { return tx } + // evm transactions can have txs waiting on this nonce + // if there are any, we should replace the heap with the next nonce + // for the address + + // remove the first item from the evmQueue pq.removeQueuedEvmTxUnsafe(tx) + // if there is a next item, now it can be added to the heap + if len(pq.evmQueue[tx.evmAddress]) > 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } + return tx } @@ -266,6 +317,7 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { func (pq *TxPriorityQueue) PopTx() *WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() + return pq.popTxUnsafe() } diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index c1e17d278..8cb6d2e1a 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -196,6 +196,7 @@ func TestTxPriorityQueue(t *testing.T) { pq.PushTx(&WrappedTx{ priority: 1000, timestamp: now, + tx: []byte(fmt.Sprintf("%d", time.Now().UnixNano())), }) require.Equal(t, 1001, pq.NumTxs()) diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 025cfda73..13ddb0a12 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -74,6 +74,12 @@ type WrappedTx struct { isEVM bool } +// IsBefore returns true if the WrappedTx is before the given WrappedTx +// this applies to EVM transactions only +func (wtx *WrappedTx) IsBefore(tx *WrappedTx) bool { + return wtx.evmNonce < tx.evmNonce || (wtx.evmNonce == tx.evmNonce && wtx.timestamp.Before(tx.timestamp)) +} + func (wtx *WrappedTx) Size() int { return len(wtx.tx) } From 9967b08cd9b179bbe1ec4adb31cf7ca5aed0c21e Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 29 Jan 2024 17:33:31 -0500 Subject: [PATCH 15/16] [EVM] Add logging for expiration (#198) * add logging for expired txs * cleanup --- internal/mempool/mempool.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 612a0bc48..df30dc7c0 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -856,6 +856,32 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { wtx.removeHandler(removeFromCache) } +func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) { + txmp.metrics.ExpiredTxs.Add(1) + txmp.logExpiredTx(blockHeight, wtx) + wtx.removeHandler(!txmp.config.KeepInvalidTxsInCache) +} + +func (txmp *TxMempool) logExpiredTx(blockHeight int64, wtx *WrappedTx) { + // defensive check + if wtx == nil { + return + } + + txmp.logger.Info( + "transaction expired", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "address", wtx.evmAddress, + "evm", wtx.isEVM, + "nonce", wtx.evmNonce, + "height", blockHeight, + "tx_height", wtx.height, + "tx_timestamp", wtx.timestamp, + "age", time.Since(wtx.timestamp), + ) +} + // purgeExpiredTxs removes all transactions that have exceeded their respective // height- and/or time-based TTLs from their respective indexes. Every expired // transaction will be removed from the mempool, but preserved in the cache (except for pending txs). @@ -902,14 +928,12 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } for _, wtx := range expiredTxs { - txmp.metrics.ExpiredTxs.Add(1) - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) + txmp.expire(blockHeight, wtx) } // remove pending txs that have expired txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { - txmp.metrics.ExpiredTxs.Add(1) - wtx.removeHandler(!txmp.config.KeepInvalidTxsInCache) + txmp.expire(blockHeight, wtx) }) } From 5b7c3b0c1881005b9081f176de2ff31ebda25ba5 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 29 Jan 2024 18:07:03 -0500 Subject: [PATCH 16/16] [EVM] Avoid returning nil transactions on ForEach (#197) * remove heapIndex to avoid nil scenario * avoid returning nil in loop (mimic Peek) --- internal/mempool/priority_queue.go | 15 +++++++++------ internal/mempool/priority_queue_test.go | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 3de4e810f..abcbfe6a7 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -293,6 +293,11 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { } tx := x.(*WrappedTx) + // this situation is primarily for a test case that inserts nils + if tx == nil { + return nil + } + // non-evm transactions do not have txs waiting on a nonce if !tx.isEVM { return tx @@ -338,6 +343,9 @@ func (pq *TxPriorityQueue) ForEachTx(handler func(wtx *WrappedTx) bool) { for i := 0; i < numTxs; i++ { popped := pq.popTxUnsafe() + if popped == nil { + break + } txs = append(txs, popped) if !handler(popped) { return @@ -377,9 +385,7 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { // // NOTE: A caller should never call Push. Use PushTx instead. func (pq *TxPriorityQueue) Push(x interface{}) { - n := len(pq.txs) item := x.(*WrappedTx) - item.heapIndex = n pq.txs = append(pq.txs, item) } @@ -390,8 +396,7 @@ func (pq *TxPriorityQueue) Pop() interface{} { old := pq.txs n := len(old) item := old[n-1] - old[n-1] = nil // avoid memory leak - item.heapIndex = -1 // for safety + old[n-1] = nil // avoid memory leak pq.txs = old[0 : n-1] return item } @@ -420,6 +425,4 @@ func (pq *TxPriorityQueue) Less(i, j int) bool { // Swap implements the Heap interface. It swaps two transactions in the queue. func (pq *TxPriorityQueue) Swap(i, j int) { pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] - pq.txs[i].heapIndex = i - pq.txs[j].heapIndex = j } diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index 8cb6d2e1a..eb340b170 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -47,6 +47,20 @@ func TestTxPriorityQueue_ReapHalf(t *testing.T) { } } +func TestAvoidPanicIfTransactionIsNil(t *testing.T) { + pq := NewTxPriorityQueue() + pq.Push(&WrappedTx{sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}) + pq.txs = append(pq.txs, nil) + + var count int + pq.ForEachTx(func(tx *WrappedTx) bool { + count++ + return true + }) + + require.Equal(t, 1, count) +} + func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { testCases := []TxTestCase{ {