From 8df065b61161705ede0a339cd04fc83eb5890428 Mon Sep 17 00:00:00 2001 From: Facundo Medica <14063057+facundomedica@users.noreply.github.com> Date: Mon, 18 Sep 2023 15:52:54 +0200 Subject: [PATCH] feat: Optimistic Execution (#16581) Co-authored-by: Aleksandr Bezobchuk --- CHANGELOG.md | 1 + baseapp/abci.go | 95 ++++++++++++-- baseapp/abci_test.go | 41 +++++++ baseapp/baseapp.go | 6 + baseapp/oe/optimistic_execution.go | 157 ++++++++++++++++++++++++ baseapp/oe/optimistic_execution_test.go | 34 +++++ baseapp/options.go | 8 ++ simapp/app.go | 7 +- simapp/app_v2.go | 2 +- types/mempool/priority_nonce.go | 12 +- types/mempool/sender_nonce.go | 12 +- 11 files changed, 357 insertions(+), 18 deletions(-) create mode 100644 baseapp/oe/optimistic_execution.go create mode 100644 baseapp/oe/optimistic_execution_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 761f8b1ae538..4fc1a5c648d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Features +* (baseapp) [#16581](https://github.com/cosmos/cosmos-sdk/pull/16581) Implement Optimistic Execution as an experimental feature (not enabled by default). * (client/keys) [#17639](https://github.com/cosmos/cosmos-sdk/pull/17639) Allows using and saving public keys encoded as base64 * (client) [#17513](https://github.com/cosmos/cosmos-sdk/pull/17513) Allow overwritting `client.toml`. Use `client.CreateClientConfig` in place of `client.ReadFromClientConfig` and provide a custom template and a custom config. * (x/bank) [#14224](https://github.com/cosmos/cosmos-sdk/pull/14224) Allow injection of restrictions on transfers using `AppendSendRestriction` or `PrependSendRestriction`. diff --git a/baseapp/abci.go b/baseapp/abci.go index 4ea0f2cbeb8f..a7747cb0ef49 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -511,6 +511,8 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc // processed the first block, as we want to avoid overwriting the finalizeState // after state changes during InitChain. if req.Height > app.initialHeight { + // abort any running OE + app.optimisticExec.Abort() app.setState(execModeFinalize, header) } @@ -557,6 +559,19 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil } + // Only execute optimistic execution if the proposal is accepted, OE is + // enabled and the block height is greater than the initial height. During + // the first block we'll be carrying state from InitChain, so it would be + // impossible for us to easily revert. + // After the first block has been processed, the next blocks will get executed + // optimistically, so that when the ABCI client calls `FinalizeBlock` the app + // can have a response ready. + if resp.Status == abci.ResponseProcessProposal_ACCEPT && + app.optimisticExec.Enabled() && + req.Height > app.initialHeight { + app.optimisticExec.Execute(req) + } + return resp, nil } @@ -668,17 +683,11 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r return resp, err } -// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. -// Specifically, it will execute an application's BeginBlock (if defined), followed -// by the transactions in the proposal, finally followed by the application's -// EndBlock (if defined). -// -// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if -// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be -// skipped. This is to support compatibility with proposers injecting vote -// extensions into the proposal, which should not themselves be executed in cases -// where they adhere to the sdk.Tx interface. -func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { +// internalFinalizeBlock executes the block, called by the Optimistic +// Execution flow or by the FinalizeBlock ABCI method. The context received is +// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx +// must be used. +func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { var events []abci.Event if err := app.checkHalt(req.Height, req.Time); err != nil { @@ -751,6 +760,15 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons return nil, err } + // First check for an abort signal after beginBlock, as it's the first place + // we spend any significant amount of time. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + events = append(events, beginBlock.Events...) // Iterate over all raw transactions in the proposal and attempt to execute @@ -777,6 +795,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons ) } + // check after every tx if we should abort + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + txResults = append(txResults, response) } @@ -789,6 +815,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons return nil, err } + // check after endBlock if we should abort, to avoid propagating the result + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + events = append(events, endBlock.Events...) cp := app.GetConsensusParams(app.finalizeBlockState.ctx) @@ -797,10 +831,47 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons TxResults: txResults, ValidatorUpdates: endBlock.ValidatorUpdates, ConsensusParamUpdates: &cp, - AppHash: app.workingHash(), }, nil } +// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. +// Specifically, it will execute an application's BeginBlock (if defined), followed +// by the transactions in the proposal, finally followed by the application's +// EndBlock (if defined). +// +// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if +// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be +// skipped. This is to support compatibility with proposers injecting vote +// extensions into the proposal, which should not themselves be executed in cases +// where they adhere to the sdk.Tx interface. +func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { + if app.optimisticExec.Initialized() { + // check if the hash we got is the same as the one we are executing + aborted := app.optimisticExec.AbortIfNeeded(req.Hash) + // Wait for the OE to finish, regardless of whether it was aborted or not + res, err := app.optimisticExec.WaitResult() + + // only return if we are not aborting + if !aborted { + if res != nil { + res.AppHash = app.workingHash() + } + return res, err + } + + // if it was aborted, we need to reset the state + app.finalizeBlockState = nil + app.optimisticExec.Reset() + } + + // if no OE is running, just run the block (this is either a block replay or a OE that got aborted) + res, err := app.internalFinalizeBlock(context.Background(), req) + if res != nil { + res.AppHash = app.workingHash() + } + return res, err +} + // checkHalt checkes if height or time exceeds halt-height or halt-time respectively. func (app *BaseApp) checkHalt(height int64, time time.Time) error { var halt bool diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index b46481af981a..3db990dd17f6 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -2230,3 +2230,44 @@ func TestBaseApp_VoteExtensions(t *testing.T) { committedAvgPrice := suite.baseApp.NewContext(true).KVStore(capKey1).Get([]byte("avgPrice")) require.Equal(t, avgPrice, committedAvgPrice) } + +func TestOptimisticExecution(t *testing.T) { + suite := NewBaseAppSuite(t, baseapp.SetOptimisticExecution()) + + _, err := suite.baseApp.InitChain(&abci.RequestInitChain{ + ConsensusParams: &cmtproto.ConsensusParams{}, + }) + require.NoError(t, err) + + // run 50 blocks + for i := 0; i < 50; i++ { + tx := newTxCounter(t, suite.txConfig, 0, 1) + txBytes, err := suite.txConfig.TxEncoder()(tx) + require.NoError(t, err) + + reqProcProp := abci.RequestProcessProposal{ + Txs: [][]byte{txBytes}, + Height: suite.baseApp.LastBlockHeight() + 1, + Hash: []byte("some-hash" + strconv.FormatInt(suite.baseApp.LastBlockHeight()+1, 10)), + } + + respProcProp, err := suite.baseApp.ProcessProposal(&reqProcProp) + require.Equal(t, abci.ResponseProcessProposal_ACCEPT, respProcProp.Status) + require.NoError(t, err) + + reqFinalizeBlock := abci.RequestFinalizeBlock{ + Height: reqProcProp.Height, + Txs: reqProcProp.Txs, + Hash: reqProcProp.Hash, + } + + respFinalizeBlock, err := suite.baseApp.FinalizeBlock(&reqFinalizeBlock) + require.NoError(t, err) + require.Len(t, respFinalizeBlock.TxResults, 1) + + _, err = suite.baseApp.Commit() + require.NoError(t, err) + } + + require.Equal(t, int64(50), suite.baseApp.LastBlockHeight()) +} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 9b11f21a770b..2fba770458bf 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -23,6 +23,7 @@ import ( "cosmossdk.io/store/snapshots" storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/baseapp/oe" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" servertypes "github.com/cosmos/cosmos-sdk/server/types" @@ -174,6 +175,11 @@ type BaseApp struct { chainID string cdc codec.Codec + + // optimisticExec contains the context required for Optimistic Execution, + // including the goroutine handling.This is experimental and must be enabled + // by developers. + optimisticExec *oe.OptimisticExecution } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a diff --git a/baseapp/oe/optimistic_execution.go b/baseapp/oe/optimistic_execution.go new file mode 100644 index 000000000000..2a6d34770955 --- /dev/null +++ b/baseapp/oe/optimistic_execution.go @@ -0,0 +1,157 @@ +package oe + +import ( + "bytes" + "context" + "encoding/hex" + "math/rand" + "sync" + "time" + + abci "github.com/cometbft/cometbft/abci/types" + + "cosmossdk.io/log" +) + +// FinalizeBlockFunc is the function that is called by the OE to finalize the +// block. It is the same as the one in the ABCI app. +type FinalizeBlockFunc func(context.Context, *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) + +// OptimisticExecution is a struct that contains the OE context. It is used to +// run the FinalizeBlock function in a goroutine, and to abort it if needed. +type OptimisticExecution struct { + finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context + logger log.Logger + + mtx sync.Mutex + stopCh chan struct{} + request *abci.RequestFinalizeBlock + response *abci.ResponseFinalizeBlock + err error + cancelFunc func() // cancel function for the context + initialized bool // A boolean value indicating whether the struct has been initialized + + // debugging/testing options + abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted +} + +// NewOptimisticExecution initializes the Optimistic Execution context but does not start it. +func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution { + logger = logger.With(log.ModuleKey, "oe") + oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn} + for _, opt := range opts { + opt(oe) + } + return oe +} + +// WithAbortRate sets the abort rate for the OE. The abort rate is a number from +// 0 to 100 that determines the percentage of OE that should be aborted. +// This is for testing purposes only and must not be used in production. +func WithAbortRate(rate int) func(*OptimisticExecution) { + return func(oe *OptimisticExecution) { + oe.abortRate = rate + } +} + +// Reset resets the OE context. Must be called whenever we want to invalidate +// the current OE. +func (oe *OptimisticExecution) Reset() { + oe.mtx.Lock() + defer oe.mtx.Unlock() + oe.request = nil + oe.response = nil + oe.err = nil + oe.initialized = false +} + +func (oe *OptimisticExecution) Enabled() bool { + return oe != nil +} + +// Initialized returns true if the OE was initialized, meaning that it contains +// a request and it was run or it is running. +func (oe *OptimisticExecution) Initialized() bool { + if oe == nil { + return false + } + oe.mtx.Lock() + defer oe.mtx.Unlock() + + return oe.initialized +} + +// Execute initializes the OE and starts it in a goroutine. +func (oe *OptimisticExecution) Execute(req *abci.RequestProcessProposal) { + oe.mtx.Lock() + defer oe.mtx.Unlock() + + oe.stopCh = make(chan struct{}) + oe.request = &abci.RequestFinalizeBlock{ + Txs: req.Txs, + DecidedLastCommit: req.ProposedLastCommit, + Misbehavior: req.Misbehavior, + Hash: req.Hash, + Height: req.Height, + Time: req.Time, + NextValidatorsHash: req.NextValidatorsHash, + ProposerAddress: req.ProposerAddress, + } + + oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String()) + ctx, cancel := context.WithCancel(context.Background()) + oe.cancelFunc = cancel + oe.initialized = true + + go func() { + start := time.Now() + resp, err := oe.finalizeBlockFunc(ctx, oe.request) + oe.mtx.Lock() + executionTime := time.Since(start) + oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", req.Height, "hash", hex.EncodeToString(req.Hash)) + oe.response, oe.err = resp, err + close(oe.stopCh) + oe.mtx.Unlock() + }() +} + +// AbortIfNeeded aborts the OE if the request hash is not the same as the one in +// the running OE. Returns true if the OE was aborted. +func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool { + if oe == nil { + return false + } + + oe.mtx.Lock() + defer oe.mtx.Unlock() + + if !bytes.Equal(oe.request.Hash, reqHash) { + oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height) + oe.cancelFunc() + return true + } else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate { + // this is for test purposes only, we can emulate a certain percentage of + // OE needed to be aborted. + oe.cancelFunc() + oe.logger.Error("OE aborted due to test abort rate") + return true + } + + return false +} + +// Abort aborts the OE unconditionally and waits for it to finish. +func (oe *OptimisticExecution) Abort() { + if oe == nil || oe.cancelFunc == nil { + return + } + + oe.cancelFunc() + <-oe.stopCh +} + +// WaitResult waits for the OE to finish and returns the result. +func (oe *OptimisticExecution) WaitResult() (*abci.ResponseFinalizeBlock, error) { + <-oe.stopCh + return oe.response, oe.err +} diff --git a/baseapp/oe/optimistic_execution_test.go b/baseapp/oe/optimistic_execution_test.go new file mode 100644 index 000000000000..0b92244783cd --- /dev/null +++ b/baseapp/oe/optimistic_execution_test.go @@ -0,0 +1,34 @@ +package oe + +import ( + "context" + "errors" + "testing" + + abci "github.com/cometbft/cometbft/abci/types" + "github.com/stretchr/testify/assert" + + "cosmossdk.io/log" +) + +func testFinalizeBlock(_ context.Context, _ *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { + return nil, errors.New("test error") +} + +func TestOptimisticExecution(t *testing.T) { + oe := NewOptimisticExecution(log.NewNopLogger(), testFinalizeBlock) + assert.True(t, oe.Enabled()) + oe.Execute(&abci.RequestProcessProposal{ + Hash: []byte("test"), + }) + assert.True(t, oe.Initialized()) + + resp, err := oe.WaitResult() + assert.Nil(t, resp) + assert.EqualError(t, err, "test error") + + assert.False(t, oe.AbortIfNeeded([]byte("test"))) + assert.True(t, oe.AbortIfNeeded([]byte("wrong_hash"))) + + oe.Reset() +} diff --git a/baseapp/options.go b/baseapp/options.go index 1046ad3a42f3..68d2704b3610 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -15,6 +15,7 @@ import ( snapshottypes "cosmossdk.io/store/snapshots/types" storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/baseapp/oe" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -106,6 +107,13 @@ func SetChainID(chainID string) func(*BaseApp) { return func(app *BaseApp) { app.chainID = chainID } } +// SetOptimisticExecution enables optimistic execution. +func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp) { + return func(app *BaseApp) { + app.optimisticExec = oe.NewOptimisticExecution(app.logger, app.internalFinalizeBlock, opts...) + } +} + func (app *BaseApp) SetName(name string) { if app.sealed { panic("SetName() on sealed BaseApp") diff --git a/simapp/app.go b/simapp/app.go index 0c24f4e1cc29..d87d9adaee41 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -33,6 +33,9 @@ import ( upgradetypes "cosmossdk.io/x/upgrade/types" abci "github.com/cometbft/cometbft/abci/types" dbm "github.com/cosmos/cosmos-db" + "github.com/cosmos/gogoproto/proto" + "github.com/spf13/cast" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" @@ -103,8 +106,6 @@ import ( "github.com/cosmos/cosmos-sdk/x/staking" stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper" stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" - "github.com/cosmos/gogoproto/proto" - "github.com/spf13/cast" ) const appName = "SimApp" @@ -241,7 +242,7 @@ func NewSimApp( voteExtHandler := NewVoteExtensionHandler() voteExtHandler.SetHandlers(bApp) } - baseAppOptions = append(baseAppOptions, voteExtOp) + baseAppOptions = append(baseAppOptions, voteExtOp, baseapp.SetOptimisticExecution()) bApp := baseapp.NewBaseApp(appName, logger, db, txConfig.TxDecoder(), baseAppOptions...) bApp.SetCommitMultiStoreTracer(traceStore) diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 50bb5cb3687d..ac916bc03788 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -220,7 +220,7 @@ func NewSimApp( voteExtHandler := NewVoteExtensionHandler() voteExtHandler.SetHandlers(bApp) } - baseAppOptions = append(baseAppOptions, voteExtOp) + baseAppOptions = append(baseAppOptions, voteExtOp, baseapp.SetOptimisticExecution()) app.App = appBuilder.Build(db, traceStore, baseAppOptions...) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index 4073409bf0e1..6f344a1c8326 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "github.com/huandu/skiplist" @@ -49,6 +50,7 @@ type ( // priority to other sender txs and must be partially ordered by both sender-nonce // and priority. PriorityNonceMempool[C comparable] struct { + mtx sync.Mutex priorityIndex *skiplist.SkipList priorityCounts map[C]int senderIndices map[string]*skiplist.SkipList @@ -194,7 +196,9 @@ func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx { // Inserting a duplicate tx with a different priority overwrites the existing tx, // changing the total order of the mempool. func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { - if mp.cfg.MaxTx > 0 && mp.CountTx() >= mp.cfg.MaxTx { + mp.mtx.Lock() + defer mp.mtx.Unlock() + if mp.cfg.MaxTx > 0 && mp.priorityIndex.Len() >= mp.cfg.MaxTx { return ErrMempoolTxMaxCapacity } else if mp.cfg.MaxTx < 0 { return nil @@ -341,6 +345,8 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx { // NOTE: It is not safe to use this iterator while removing transactions from // the underlying mempool. func (mp *PriorityNonceMempool[C]) Select(_ context.Context, _ [][]byte) Iterator { + mp.mtx.Lock() + defer mp.mtx.Unlock() if mp.priorityIndex.Len() == 0 { return nil } @@ -409,12 +415,16 @@ func senderWeight[C comparable](txPriority TxPriority[C], senderCursor *skiplist // CountTx returns the number of transactions in the mempool. func (mp *PriorityNonceMempool[C]) CountTx() int { + mp.mtx.Lock() + defer mp.mtx.Unlock() return mp.priorityIndex.Len() } // Remove removes a transaction from the mempool in O(log n) time, returning an // error if unsuccessful. func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error { + mp.mtx.Lock() + defer mp.mtx.Unlock() sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err diff --git a/types/mempool/sender_nonce.go b/types/mempool/sender_nonce.go index c013072dfecd..7645bda33993 100644 --- a/types/mempool/sender_nonce.go +++ b/types/mempool/sender_nonce.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "fmt" "math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand + "sync" "github.com/huandu/skiplist" @@ -31,6 +32,7 @@ var DefaultMaxTx = 0 // Note that PrepareProposal could choose to stop iteration before reaching the // end if maxBytes is reached. type SenderNonceMempool struct { + mtx sync.Mutex senders map[string]*skiplist.SkipList rnd *rand.Rand maxTx int @@ -116,7 +118,9 @@ func (snm *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx { // Insert adds a tx to the mempool. It returns an error if the tx does not have // at least one signer. Note, priority is ignored. func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { - if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx { + snm.mtx.Lock() + defer snm.mtx.Unlock() + if snm.maxTx > 0 && len(snm.existingTx) >= snm.maxTx { return ErrMempoolTxMaxCapacity } if snm.maxTx < 0 { @@ -155,6 +159,8 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { // NOTE: It is not safe to use this iterator while removing transactions from // the underlying mempool. func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { + snm.mtx.Lock() + defer snm.mtx.Unlock() var senders []string senderCursors := make(map[string]*skiplist.Element) @@ -184,12 +190,16 @@ func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { // CountTx returns the total count of txs in the mempool. func (snm *SenderNonceMempool) CountTx() int { + snm.mtx.Lock() + defer snm.mtx.Unlock() return len(snm.existingTx) } // Remove removes a tx from the mempool. It returns an error if the tx does not // have at least one signer or the tx was not found in the pool. func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { + snm.mtx.Lock() + defer snm.mtx.Unlock() sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err