Skip to content

Commit

Permalink
Support pending transaction in mempool (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored and udpatil committed Mar 26, 2024
1 parent ce065b7 commit 12d68cb
Show file tree
Hide file tree
Showing 18 changed files with 173 additions and 50 deletions.
5 changes: 3 additions & 2 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err
}

func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ
return res.GetInfo(), nil
}

func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return res.GetCheckTx(), nil
return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil
}

func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil
}

func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil
func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil
}

func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) {
Expand Down
8 changes: 8 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (*
func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx)
}

func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
resV2, err := app.Application.CheckTx(ctx, req)
if err != nil {
return &types.ResponseCheckTx{}, err
}
return resV2.ResponseCheckTx, nil
}
6 changes: 3 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Application interface {
Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state

// Mempool Connection
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool

// Consensus Connection
InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo,
return &ResponseInfo{}, nil
}

func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
return &ResponseCheckTx{Code: CodeTypeOK}, nil
func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) {
return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil
}

func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response {
}
}

func ToResponseCheckTx(res *ResponseCheckTx) *Response {
func ToResponseCheckTx(res *ResponseCheckTxV2) *Response {
return &Response{
Value: &Response_CheckTx{res},
Value: &Response_CheckTx{res.ResponseCheckTx},
}
}

Expand Down
8 changes: 4 additions & 4 deletions abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,21 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) {
}
return s, nil
}

type PendingTxCheckerResponse int

const (
Accepted PendingTxCheckerResponse = iota
Rejected
Pending
)

type PendingTxChecker func() PendingTxCheckerResponse

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
}
8 changes: 4 additions & 4 deletions internal/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques
return res, nil
}

func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
app.mu.Lock()
defer app.mu.Unlock()

txValue := txAsUint64(req.Tx)
if txValue != uint64(app.mempoolTxCount) {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
}, nil
}}, nil
}
app.mempoolTxCount++
return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil
}

func txAsUint64(tx []byte) uint64 {
Expand Down
45 changes: 37 additions & 8 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type TxMempool struct {
// index. i.e. older transactions are first.
timestampIndex *WrappedTxList

// pendingTxs stores transactions that are not valid yet but might become valid
// if its checker returns Accepted
pendingTxs *PendingTxs

// A read/write lock is used to safe guard updates, insertions and deletions
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
// however, a caller must explicitly grab a write-lock via Lock when updating
Expand Down Expand Up @@ -120,6 +124,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -286,17 +291,25 @@ func (txmp *TxMempool) CheckTx(
height: txmp.height,
}

// only add new transaction if checkTx passes
if err == nil {
err = txmp.addNewTransaction(wtx, res, txInfo)
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)

if err != nil {
return err
if err != nil {
return err
}
} else {
// otherwise add to pending txs store
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}

if cb != nil {
cb(res)
cb(res.ResponseCheckTx)
}

return nil
Expand Down Expand Up @@ -470,6 +483,7 @@ func (txmp *TxMempool) Update(
}
}

txmp.handlePendingTransactions()
txmp.purgeExpiredTxs(blockHeight)

// If there any uncommitted transactions left in the mempool, we either
Expand Down Expand Up @@ -633,7 +647,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
//
// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) {
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) {
if txmp.recheckCursor == nil {
return
}
Expand Down Expand Up @@ -676,10 +690,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, res)
err = txmp.postCheck(tx, res.ResponseCheckTx)
}

if res.Code == abci.CodeTypeOK && err == nil {
// we will treat a transaction that turns pending in a recheck as invalid and evict it
if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction {
wtx.priority = res.Priority
} else {
txmp.logger.Debug(
Expand Down Expand Up @@ -903,3 +918,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string

return builder.String()
}

func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
txmp.cache.Remove(tx.tx.tx)
}
}
}
14 changes: 7 additions & 7 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type testTx struct {
priority int64
}

func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
var (
priority int64
sender string
Expand All @@ -47,29 +47,29 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
if len(parts) == 3 {
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}, nil
}}, nil
}

priority = v
sender = string(parts[0])
} else {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}, nil
}}, nil
}

return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Code: code.CodeTypeOK,
GasWanted: 1,
}, nil
}}, nil
}

func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
Expand Down
71 changes: 68 additions & 3 deletions internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -72,9 +73,9 @@ func (wtx *WrappedTx) Size() int {
// TxStore implements a thread-safe mapping of valid transaction(s).
//
// NOTE:
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative
// access is not allowed. Regardless, it is not expected for the mempool to
// need mutative access.
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative
// access is not allowed. Regardless, it is not expected for the mempool to
// need mutative access.
type TxStore struct {
mtx sync.RWMutex
hashTxs map[types.TxKey]*WrappedTx // primary index
Expand Down Expand Up @@ -291,3 +292,67 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {
i++
}
}

type PendingTxs struct {
mtx *sync.Mutex
txs []PendingTxInfo
}

type PendingTxInfo struct {
tx *WrappedTx
checkTxResponse *abci.ResponseCheckTxV2
txInfo TxInfo
}

func NewPendingTxs() *PendingTxs {
return &PendingTxs{
mtx: &sync.Mutex{},
txs: []PendingTxInfo{},
}
}

func (p *PendingTxs) EvaluatePendingTransactions() (
acceptedTxs []PendingTxInfo,
rejectedTxs []PendingTxInfo,
) {
poppedIndices := []int{}
p.mtx.Lock()
defer p.mtx.Unlock()
for i := 0; i < len(p.txs); i++ {
switch p.txs[i].checkTxResponse.Checker() {
case abci.Accepted:
acceptedTxs = append(acceptedTxs, p.txs[i])
poppedIndices = append(poppedIndices, i)
case abci.Rejected:
rejectedTxs = append(rejectedTxs, p.txs[i])
poppedIndices = append(poppedIndices, i)
}
}
p.popTxsAtIndices(poppedIndices)
return
}

// assume mtx is already acquired
func (p *PendingTxs) popTxsAtIndices(indices []int) {
if len(indices) == 0 {
return
}
newTxs := []PendingTxInfo{}
start := 0
for _, idx := range indices {
newTxs = append(newTxs, p.txs[start:idx]...)
start = idx
}
newTxs = append(newTxs, p.txs[indices[len(indices)-1]:]...)
p.txs = newTxs
}

func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.txs = append(p.txs, PendingTxInfo{
tx: tx,
checkTxResponse: resCheckTx,
txInfo: txInfo,
})
}
Loading

0 comments on commit 12d68cb

Please sign in to comment.