diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index f1be2c120..8eec13ef8 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true)) } -func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) { - return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true)) +func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { + resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true)) + return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err } func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) { diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go index 44a5eb00f..a0626cff0 100644 --- a/abci/client/mocks/client.go +++ b/abci/client/mocks/client.go @@ -38,15 +38,15 @@ func (_m *Client) ApplySnapshotChunk(_a0 context.Context, _a1 *types.RequestAppl } // CheckTx provides a mock function with given fields: _a0, _a1 -func (_m *Client) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (_m *Client) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { ret := _m.Called(_a0, _a1) - var r0 *types.ResponseCheckTx - if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTx); ok { + var r0 *types.ResponseCheckTxV2 + if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTxV2); ok { r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.ResponseCheckTx) + r0 = ret.Get(0).(*types.ResponseCheckTxV2) } } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 80f106333..35f244666 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ return res.GetInfo(), nil } -func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req)) if err != nil { return nil, err } - return res.GetCheckTx(), nil + return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil } func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) { diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index f33ff2be1..ac0d122b1 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil } -func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { - return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil +func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { + return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil } func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) { diff --git a/abci/server/grpc_server.go b/abci/server/grpc_server.go index 0dfee8169..6f7caf39c 100644 --- a/abci/server/grpc_server.go +++ b/abci/server/grpc_server.go @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (* func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) { return app.Application.Commit(ctx) } + +func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + resV2, err := app.Application.CheckTx(ctx, req) + if err != nil { + return &types.ResponseCheckTx{}, err + } + return resV2.ResponseCheckTx, nil +} diff --git a/abci/types/application.go b/abci/types/application.go index 9915d16a1..5f98bdc9f 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -12,7 +12,7 @@ type Application interface { Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state // Mempool Connection - CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool + CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool // Consensus Connection InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo, return &ResponseInfo{}, nil } -func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) { - return &ResponseCheckTx{Code: CodeTypeOK}, nil +func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) { + return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil } func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) { diff --git a/abci/types/messages.go b/abci/types/messages.go index 2ab6ac1df..f6a6c12d1 100644 --- a/abci/types/messages.go +++ b/abci/types/messages.go @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response { } } -func ToResponseCheckTx(res *ResponseCheckTx) *Response { +func ToResponseCheckTx(res *ResponseCheckTxV2) *Response { return &Response{ - Value: &Response_CheckTx{res}, + Value: &Response_CheckTx{res.ResponseCheckTx}, } } diff --git a/abci/types/mocks/application.go b/abci/types/mocks/application.go index c6eb29219..08ef9ca5c 100644 --- a/abci/types/mocks/application.go +++ b/abci/types/mocks/application.go @@ -38,15 +38,15 @@ func (_m *Application) ApplySnapshotChunk(_a0 context.Context, _a1 *types.Reques } // CheckTx provides a mock function with given fields: _a0, _a1 -func (_m *Application) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (_m *Application) CheckTx(_a0 context.Context, _a1 *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { ret := _m.Called(_a0, _a1) - var r0 *types.ResponseCheckTx - if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTx); ok { + var r0 *types.ResponseCheckTxV2 + if rf, ok := ret.Get(0).(func(context.Context, *types.RequestCheckTx) *types.ResponseCheckTxV2); ok { r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.ResponseCheckTx) + r0 = ret.Get(0).(*types.ResponseCheckTxV2) } } diff --git a/abci/types/types.go b/abci/types/types.go index 121e72159..4df6d4751 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -237,3 +237,21 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) { } return s, nil } + +type PendingTxCheckerResponse int + +const ( + Accepted PendingTxCheckerResponse = iota + Rejected + Pending +) + +type PendingTxChecker func() PendingTxCheckerResponse + +// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able +// to utilize the new fields in V2 type (but still be backwards-compatible) +type ResponseCheckTxV2 struct { + *ResponseCheckTx + IsPendingTransaction bool + Checker PendingTxChecker // must not be nil if IsPendingTransaction is true +} diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index e634d4b22..84badbeeb 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques return res, nil } -func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { app.mu.Lock() defer app.mu.Unlock() txValue := txAsUint64(req.Tx) if txValue != uint64(app.mempoolTxCount) { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Code: code.CodeTypeBadNonce, - }, nil + }}, nil } app.mempoolTxCount++ - return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil } func txAsUint64(tx []byte) uint64 { diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b57603b50..d2b84910c 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -81,6 +81,10 @@ type TxMempool struct { // index. i.e. older transactions are first. timestampIndex *WrappedTxList + // pendingTxs stores transactions that are not valid yet but might become valid + // if its checker returns Accepted + pendingTxs *PendingTxs + // A read/write lock is used to safe guard updates, insertions and deletions // from the mempool. A read-lock is implicitly acquired when executing CheckTx, // however, a caller must explicitly grab a write-lock via Lock when updating @@ -120,6 +124,7 @@ func NewTxMempool( timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) }), + pendingTxs: NewPendingTxs(), failedCheckTxCounts: map[types.NodeID]uint64{}, peerManager: peerManager, } @@ -286,17 +291,25 @@ func (txmp *TxMempool) CheckTx( height: txmp.height, } - // only add new transaction if checkTx passes if err == nil { - err = txmp.addNewTransaction(wtx, res, txInfo) + // only add new transaction if checkTx passes and is not pending + if !res.IsPendingTransaction { + err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo) - if err != nil { - return err + if err != nil { + return err + } + } else { + // otherwise add to pending txs store + if res.Checker == nil { + return errors.New("no checker available for pending transaction") + } + txmp.pendingTxs.Insert(wtx, res, txInfo) } } if cb != nil { - cb(res) + cb(res.ResponseCheckTx) } return nil @@ -470,6 +483,7 @@ func (txmp *TxMempool) Update( } } + txmp.handlePendingTransactions() txmp.purgeExpiredTxs(blockHeight) // If there any uncommitted transactions left in the mempool, we either @@ -633,7 +647,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck // // This method is NOT executed for the initial CheckTx on a new transaction; // that case is handled by addNewTransaction instead. -func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) { +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) { if txmp.recheckCursor == nil { return } @@ -676,10 +690,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT if !txmp.txStore.IsTxRemoved(wtx.hash) { var err error if txmp.postCheck != nil { - err = txmp.postCheck(tx, res) + err = txmp.postCheck(tx, res.ResponseCheckTx) } - if res.Code == abci.CodeTypeOK && err == nil { + // we will treat a transaction that turns pending in a recheck as invalid and evict it + if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction { wtx.priority = res.Priority } else { txmp.logger.Debug( @@ -904,3 +919,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string jsonData, _ := json.Marshal(logs) return string(jsonData) } + +func (txmp *TxMempool) handlePendingTransactions() { + accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions() + for _, tx := range accepted { + if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil { + txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err)) + } + } + if !txmp.config.KeepInvalidTxsInCache { + for _, tx := range rejected { + txmp.cache.Remove(tx.tx.tx) + } + } +} diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index d86ba8f7e..336357aa2 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -37,7 +37,7 @@ type testTx struct { priority int64 } -func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { var ( priority int64 sender string @@ -48,29 +48,29 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a if len(parts) == 3 { v, err := strconv.ParseInt(string(parts[2]), 10, 64) if err != nil { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Code: 100, GasWanted: 1, - }, nil + }}, nil } priority = v sender = string(parts[0]) } else { - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Code: 101, GasWanted: 1, - }, nil + }}, nil } - return &abci.ResponseCheckTx{ + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Sender: sender, Code: code.CodeTypeOK, GasWanted: 1, - }, nil + }}, nil } func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool { @@ -727,4 +727,4 @@ func TestAppendCheckTxErr(t *testing.T) { require.NoError(t, err) require.Equal(t, len(data), 1) require.Equal(t, data[0]["log"], "sample error msg") -} \ No newline at end of file +} diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c7113c951..9570935eb 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -5,6 +5,7 @@ import ( "sync" "time" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -72,9 +73,9 @@ func (wtx *WrappedTx) Size() int { // TxStore implements a thread-safe mapping of valid transaction(s). // // NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. +// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative +// access is not allowed. Regardless, it is not expected for the mempool to +// need mutative access. type TxStore struct { mtx sync.RWMutex hashTxs map[types.TxKey]*WrappedTx // primary index @@ -291,3 +292,67 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { i++ } } + +type PendingTxs struct { + mtx *sync.Mutex + txs []PendingTxInfo +} + +type PendingTxInfo struct { + tx *WrappedTx + checkTxResponse *abci.ResponseCheckTxV2 + txInfo TxInfo +} + +func NewPendingTxs() *PendingTxs { + return &PendingTxs{ + mtx: &sync.Mutex{}, + txs: []PendingTxInfo{}, + } +} + +func (p *PendingTxs) EvaluatePendingTransactions() ( + acceptedTxs []PendingTxInfo, + rejectedTxs []PendingTxInfo, +) { + poppedIndices := []int{} + p.mtx.Lock() + defer p.mtx.Unlock() + for i := 0; i < len(p.txs); i++ { + switch p.txs[i].checkTxResponse.Checker() { + case abci.Accepted: + acceptedTxs = append(acceptedTxs, p.txs[i]) + poppedIndices = append(poppedIndices, i) + case abci.Rejected: + rejectedTxs = append(rejectedTxs, p.txs[i]) + poppedIndices = append(poppedIndices, i) + } + } + p.popTxsAtIndices(poppedIndices) + return +} + +// assume mtx is already acquired +func (p *PendingTxs) popTxsAtIndices(indices []int) { + if len(indices) == 0 { + return + } + newTxs := []PendingTxInfo{} + start := 0 + for _, idx := range indices { + newTxs = append(newTxs, p.txs[start:idx]...) + start = idx + } + newTxs = append(newTxs, p.txs[indices[len(indices)-1]:]...) + p.txs = newTxs +} + +func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.txs = append(p.txs, PendingTxInfo{ + tx: tx, + checkTxResponse: resCheckTx, + txInfo: txInfo, + }) +} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index fa1d8c66e..f3ab28f5c 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -169,7 +169,7 @@ func (app *proxyClient) Flush(ctx context.Context) error { return app.client.Flush(ctx) } -func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (app *proxyClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() return app.client.CheckTx(ctx, req) } diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 3d6bc5673..1e8628641 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -184,7 +184,7 @@ func (env *Environment) CheckTx(ctx context.Context, req *coretypes.RequestCheck if err != nil { return nil, err } - return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil + return &coretypes.ResultCheckTx{ResponseCheckTx: *res.ResponseCheckTx}, nil } func (env *Environment) RemoveTx(ctx context.Context, req *coretypes.RequestRemoveTx) error { diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index 4284bad68..1d0d59441 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -302,8 +302,8 @@ func (app *testApp) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBl }, nil } -func (app *testApp) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { - return &abci.ResponseCheckTx{}, nil +func (app *testApp) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{}}, nil } func (app *testApp) Commit(context.Context) (*abci.ResponseCommit, error) { diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index 142d64d19..27d779554 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -60,7 +60,7 @@ func (a ABCIApp) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes return nil, err } - res := &coretypes.ResultBroadcastTxCommit{CheckTx: *resp} + res := &coretypes.ResultBroadcastTxCommit{CheckTx: *resp.ResponseCheckTx} if res.CheckTx.IsErr() { return res, nil } diff --git a/test/e2e/app/app.go b/test/e2e/app/app.go index 885b49f39..b127fa744 100644 --- a/test/e2e/app/app.go +++ b/test/e2e/app/app.go @@ -163,14 +163,14 @@ func (app *Application) InitChain(_ context.Context, req *abci.RequestInitChain) } // CheckTx implements ABCI. -func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { +func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { app.mu.Lock() defer app.mu.Unlock() _, _, err := parseTx(req.Tx) if err != nil { - return &abci.ResponseCheckTx{ - Code: code.CodeTypeEncodingError, + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeEncodingError}, }, nil } @@ -178,7 +178,9 @@ func (app *Application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a time.Sleep(time.Duration(app.cfg.CheckTxDelayMS) * time.Millisecond) } - return &abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, + }, nil } // FinalizeBlock implements ABCI.