Skip to content

Commit

Permalink
Problem: mempool don't respect gas wanted returned by ante handler (#507
Browse files Browse the repository at this point in the history
)

* Problem: mempool don't respect gas wanted returned by ante handler

Solution:
- support custom gas wanted in mempool

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* cleanup

* cleanup

* fix priorityIndex

* fix process proposal

* fix lint

* fix lint

---------

Signed-off-by: yihuang <[email protected]>
  • Loading branch information
yihuang authored Jun 26, 2024
1 parent cf12e5a commit 36295f0
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (baseapp) [#20144](https://github.com/cosmos/cosmos-sdk/pull/20144) Remove txs from mempool when antehandler fails in recheck.
* (baseapp) [#20107](https://github.com/cosmos/cosmos-sdk/pull/20107) Avoid header height overwrite block height.
* (cli) [#20020](https://github.com/cosmos/cosmos-sdk/pull/20020) Make bootstrap-state command support both new and legacy genesis format.
* [#507](https://github.com/crypto-org-chain/cosmos-sdk/pull/507) mempool respect gas wanted returned by ante handler

### Bug Fixes

Expand Down
42 changes: 19 additions & 23 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ type (
// to verify a transaction.
ProposalTxVerifier interface {
PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error)
ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error)
ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, uint64, error)
TxDecode(txBz []byte) (sdk.Tx, error)
TxEncode(tx sdk.Tx) ([]byte, error)
}
Expand Down Expand Up @@ -270,7 +270,14 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
return nil, err
}

stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
var txGasLimit uint64
if gasTx, ok := tx.(interface {
GetGas() uint64
}); ok {
txGasLimit = gasTx.GetGas()
}

stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz, txGasLimit)
if stop {
break
}
Expand All @@ -284,7 +291,7 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
var selectedTxsNums int
for iterator != nil {
memTx := iterator.Tx()
signerData, err := h.signerExtAdapter.GetSigners(memTx)
signerData, err := h.signerExtAdapter.GetSigners(memTx.Tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -318,14 +325,14 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
// which calls mempool.Insert, in theory everything in the pool should be
// valid. But some mempool implementations may insert invalid txs, so we
// check again.
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx.Tx)
if err != nil {
err := h.mempool.Remove(memTx)
err := h.mempool.Remove(memTx.Tx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx.Tx, txBz, memTx.GasWanted)
if stop {
break
}
Expand Down Expand Up @@ -383,17 +390,13 @@ func (h *DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHan
}

for _, txBytes := range req.Txs {
tx, err := h.txVerifier.ProcessProposalVerifyTx(txBytes)
_, gasWanted, err := h.txVerifier.ProcessProposalVerifyTx(txBytes)
if err != nil {
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
}

if maxBlockGas > 0 {
gasTx, ok := tx.(GasTx)
if ok {
totalTxGas += gasTx.GetGas()
}

totalTxGas += gasWanted
if totalTxGas > uint64(maxBlockGas) {
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
}
Expand Down Expand Up @@ -451,7 +454,7 @@ type TxSelector interface {
// a proposal based on inclusion criteria defined by the TxSelector. It must
// return <true> if the caller should halt the transaction selection loop
// (typically over a mempool) or <false> otherwise.
SelectTxForProposal(ctx context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool
SelectTxForProposal(ctx context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte, gasWanted uint64) bool
}

type defaultTxSelector struct {
Expand All @@ -476,23 +479,16 @@ func (ts *defaultTxSelector) Clear() {
ts.selectedTxs = nil
}

func (ts *defaultTxSelector) SelectTxForProposal(_ context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool {
func (ts *defaultTxSelector) SelectTxForProposal(_ context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte, gasWanted uint64) bool {
txSize := uint64(len(txBz))

var txGasLimit uint64
if memTx != nil {
if gasTx, ok := memTx.(GasTx); ok {
txGasLimit = gasTx.GetGas()
}
}

// only add the transaction to the proposal if we have enough capacity
if (txSize + ts.totalTxBytes) <= maxTxBytes {
// If there is a max block gas limit, add the tx only if the limit has
// not been met.
if maxBlockGas > 0 {
if (txGasLimit + ts.totalTxGas) <= maxBlockGas {
ts.totalTxGas += txGasLimit
if (gasWanted + ts.totalTxGas) <= maxBlockGas {
ts.totalTxGas += gasWanted
ts.totalTxBytes += txSize
ts.selectedTxs = append(ts.selectedTxs, txBz)
}
Expand Down
12 changes: 6 additions & 6 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex i
}

if mode == execModeCheck {
err = app.mempool.Insert(ctx, tx)
err = app.mempool.InsertWithGasWanted(ctx, tx, gasWanted)
if err != nil {
return gInfo, nil, anteEvents, err
}
Expand Down Expand Up @@ -1148,18 +1148,18 @@ func (app *BaseApp) PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) {
// ProcessProposal state internally will be discarded. <nil, err> will be
// returned if the transaction cannot be decoded. <Tx, nil> will be returned if
// the transaction is valid, otherwise <Tx, err> will be returned.
func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) {
func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, uint64, error) {
tx, err := app.txDecoder(txBz)
if err != nil {
return nil, err
return nil, 0, err
}

_, _, _, err = app.runTx(execModeProcessProposal, txBz)
gInfo, _, _, err := app.runTx(execModeProcessProposal, txBz)
if err != nil {
return nil, err
return nil, 0, err
}

return tx, nil
return tx, gInfo.GasWanted, nil
}

func (app *BaseApp) TxDecode(txBytes []byte) (sdk.Tx, error) {
Expand Down
6 changes: 4 additions & 2 deletions baseapp/testutil/mock/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,30 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

type Tx struct {
Tx sdk.Tx
GasWanted uint64
}

func NewMempoolTx(tx sdk.Tx, gasWanted uint64) Tx {
return Tx{
Tx: tx,
GasWanted: gasWanted,
}
}

type GasTx interface {
GetGas() uint64
}

type Mempool interface {
// Insert attempts to insert a Tx into the app-side mempool returning
// an error upon failure.
Insert(context.Context, sdk.Tx) error

// Insert with a custom gas wanted value
InsertWithGasWanted(context.Context, sdk.Tx, uint64) error

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must
// closed by the caller.
Expand All @@ -34,7 +53,7 @@ type Iterator interface {
Next() Iterator

// Tx returns the transaction at the current position of the iterator.
Tx() sdk.Tx
Tx() Tx
}

var (
Expand Down
2 changes: 1 addition & 1 deletion types/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func fetchTxs(iterator mempool.Iterator, maxBytes int64) []sdk.Tx {
if numBytes += txSize; numBytes > maxBytes {
break
}
txs = append(txs, iterator.Tx())
txs = append(txs, iterator.Tx().Tx)
i := iterator.Next()
iterator = i
}
Expand Down
9 changes: 5 additions & 4 deletions types/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ var _ Mempool = (*NoOpMempool)(nil)
// is FIFO-ordered by default.
type NoOpMempool struct{}

func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }
func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) InsertWithGasWanted(context.Context, sdk.Tx, uint64) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }
25 changes: 18 additions & 7 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx {
}

cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
return cursor.Value.(Tx).Tx
}

// Insert attempts to insert a Tx into the app-side mempool in O(log n) time,
Expand All @@ -201,7 +201,7 @@ func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx {
//
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
func (mp *PriorityNonceMempool[C]) InsertWithGasWanted(ctx context.Context, tx sdk.Tx, gasWanted uint64) error {
mp.mtx.Lock()
defer mp.mtx.Unlock()
if mp.cfg.MaxTx > 0 && mp.priorityIndex.Len() >= mp.cfg.MaxTx {
Expand All @@ -210,6 +210,8 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error
return nil
}

memTx := NewMempoolTx(tx, gasWanted)

sigs, err := mp.cfg.SignerExtractor.GetSigners(tx)
if err != nil {
return err
Expand Down Expand Up @@ -243,12 +245,12 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error
// changes.
sk := txMeta[C]{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if mp.cfg.TxReplacement != nil && !mp.cfg.TxReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
if mp.cfg.TxReplacement != nil && !mp.cfg.TxReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(Tx).Tx, tx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
senderIndex.Get(key).Value.(Tx).Tx,
tx,
)
}
Expand All @@ -266,14 +268,23 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error

// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)
key.senderElement = senderIndex.Set(key, memTx)

mp.scores[sk] = txMeta[C]{priority: priority}
mp.priorityIndex.Set(key, tx)

return nil
}

func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
var gasLimit uint64
if gasTx, ok := tx.(GasTx); ok {
gasLimit = gasTx.GetGas()
}

return mp.InsertWithGasWanted(ctx, tx, gasLimit)
}

func (i *PriorityNonceIterator[C]) iteratePriority() Iterator {
// beginning of priority iteration
if i.priorityNode == nil {
Expand Down Expand Up @@ -337,8 +348,8 @@ func (i *PriorityNonceIterator[C]) Next() Iterator {
return i
}

func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
return i.senderCursors[i.sender].Value.(sdk.Tx)
func (i *PriorityNonceIterator[C]) Tx() Tx {
return i.senderCursors[i.sender].Value.(Tx)
}

// Select returns a set of transactions from the mempool, ordered by priority
Expand Down
6 changes: 3 additions & 3 deletions types/mempool/priority_nonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *MempoolTestSuite) TestIterator() {
// iterate through txs
iterator := pool.Select(ctx, nil)
for iterator != nil {
tx := iterator.Tx().(testTx)
tx := iterator.Tx().Tx.(testTx)
require.Equal(t, tt.txs[tx.id].p, int(tx.priority))
require.Equal(t, tt.txs[tx.id].n, int(tx.nonce))
require.Equal(t, tt.txs[tx.id].a, tx.address)
Expand Down Expand Up @@ -854,7 +854,7 @@ func TestNextSenderTx_TxReplacement(t *testing.T) {
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, tx, iter.Tx())
require.Equal(t, tx, iter.Tx().Tx)
}

// test Priority with TxReplacement
Expand Down Expand Up @@ -889,5 +889,5 @@ func TestNextSenderTx_TxReplacement(t *testing.T) {
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, txs[3], iter.Tx())
require.Equal(t, txs[3], iter.Tx().Tx)
}
21 changes: 16 additions & 5 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (snm *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx {
}

cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
return cursor.Value.(Tx).Tx
}

// Insert adds a tx to the mempool. It returns an error if the tx does not have
// at least one signer. Note, priority is ignored.
func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
func (snm *SenderNonceMempool) InsertWithGasWanted(_ context.Context, tx sdk.Tx, gasLimit uint64) error {
snm.mtx.Lock()
defer snm.mtx.Unlock()
if snm.maxTx > 0 && len(snm.existingTx) >= snm.maxTx {
Expand All @@ -127,6 +127,8 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
return nil
}

memTx := NewMempoolTx(tx, gasLimit)

sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
Expand All @@ -145,14 +147,23 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
snm.senders[sender] = senderTxs
}

senderTxs.Set(nonce, tx)
senderTxs.Set(nonce, memTx)

key := txKey{nonce: nonce, address: sender}
snm.existingTx[key] = true

return nil
}

func (snm *SenderNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
var gasLimit uint64
if gasTx, ok := tx.(GasTx); ok {
gasLimit = gasTx.GetGas()
}

return snm.InsertWithGasWanted(ctx, tx, gasLimit)
}

// Select returns an iterator ordering transactions the mempool with the lowest
// nonce of a random selected sender first.
//
Expand Down Expand Up @@ -268,8 +279,8 @@ func (i *senderNonceMempoolIterator) Next() Iterator {
return nil
}

func (i *senderNonceMempoolIterator) Tx() sdk.Tx {
return i.currentTx.Value.(sdk.Tx)
func (i *senderNonceMempoolIterator) Tx() Tx {
return i.currentTx.Value.(Tx)
}

func removeAtIndex[T any](slice []T, index int) []T {
Expand Down
2 changes: 1 addition & 1 deletion types/mempool/sender_nonce_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func fetchAllTxs(iterator mempool.Iterator) []testTx {
var txs []testTx
for iterator != nil {
tx := iterator.Tx()
txs = append(txs, tx.(testTx))
txs = append(txs, tx.Tx.(testTx))
i := iterator.Next()
iterator = i
}
Expand Down

0 comments on commit 36295f0

Please sign in to comment.