Skip to content

Commit

Permalink
separate limit for pending tx
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Feb 20, 2024
1 parent a45f7c1 commit b1df7a8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 14 deletions.
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,16 @@ type MempoolConfig struct {
// blacklist the peer.
CheckTxErrorBlacklistEnabled bool `mapstructure:"check-tx-error-blacklist-enabled"`
CheckTxErrorThreshold int `mapstructure:"check-tx-error-threshold"`

// Maximum number of transactions in the pending set
PendingSize int `mapstructure:"pending-size"`

// Limit the total size of all txs in the pending set.
MaxPendingTxsBytes int64 `mapstructure:"max-pending-txs-bytes"`

PendingTTLDuration time.Duration `mapstructure:"pending-ttl-duration"`

PendingTTLNumBlocks int64 `mapstructure:"pending-ttl-num-blocks"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool.
Expand All @@ -816,6 +826,10 @@ func DefaultMempoolConfig() *MempoolConfig {
TxNotifyThreshold: 0,
CheckTxErrorBlacklistEnabled: false,
CheckTxErrorThreshold: 0,
PendingSize: 5000,
MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB
PendingTTLDuration: 0 * time.Second,
PendingTTLNumBlocks: 0,
}
}

Expand Down
8 changes: 8 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ check-tx-error-blacklist-enabled = {{ .Mempool.CheckTxErrorBlacklistEnabled }}
check-tx-error-threshold = {{ .Mempool.CheckTxErrorThreshold }}
pending-size = {{ .Mempool.PendingSize }}
max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }}
pending-ttl-duration = {{ .Mempool.PendingTTLDuration }}
pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }}
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
63 changes: 49 additions & 14 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type TxMempool struct {
// sizeBytes defines the total size of the mempool (sum of all tx bytes)
sizeBytes int64

// pendingSizeBytes defines the total size of the pending set (sum of all tx bytes)
pendingSizeBytes int64

// cache defines a fixed-size cache of already seen transactions as this
// reduces pressure on the proxyApp.
cache TxCache
Expand Down Expand Up @@ -177,9 +180,11 @@ func (txmp *TxMempool) Unlock() {
// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int {
txSize := txmp.txStore.Size()
pendingSize := txmp.pendingTxs.Size()
return txSize + pendingSize
return txmp.SizeWithoutPending() + txmp.PendingSize()
}

func (txmp *TxMempool) SizeWithoutPending() int {
return txmp.txStore.Size()
}

// PendingSize returns the number of pending transactions in the mempool.
Expand All @@ -193,6 +198,10 @@ func (txmp *TxMempool) SizeBytes() int64 {
return atomic.LoadInt64(&txmp.sizeBytes)
}

func (txmp *TxMempool) PendingSizeBytes() int64 {
return atomic.LoadInt64(&txmp.pendingSizeBytes)

Check warning on line 202 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}

// FlushAppConn executes FlushSync on the mempool's proxyAppConn.
//
// NOTE: The caller must obtain a write-lock prior to execution.
Expand Down Expand Up @@ -326,6 +335,11 @@ func (txmp *TxMempool) CheckTx(
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
if err := txmp.canAddPendingTx(wtx); err != nil {
// TODO: eviction strategy for pending transactions
return err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))

Check warning on line 342 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L338-L342

Added lines #L338 - L342 were not covered by tests
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}
Expand Down Expand Up @@ -410,7 +424,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
)

var txs []types.Tx
if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold {
if uint64(txmp.SizeWithoutPending()) < txmp.config.TxNotifyThreshold {
// do not reap anything if threshold is not met
return txs
}
Expand Down Expand Up @@ -522,7 +536,7 @@ func (txmp *TxMempool) Update(
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
return nil
}
Expand Down Expand Up @@ -640,7 +654,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
}

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

if txmp.insertTx(wtx) {
Expand All @@ -649,7 +663,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"height", txmp.height,
"num_txs", txmp.Size(),
"num_txs", txmp.SizeWithoutPending(),
)
txmp.notifyTxsAvailable()
}
Expand Down Expand Up @@ -745,12 +759,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
if txmp.recheckCursor == nil {
txmp.logger.Debug("finished rechecking transactions")

if txmp.Size() > 0 {
if txmp.SizeWithoutPending() > 0 {
txmp.notifyTxsAvailable()
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
}

Expand Down Expand Up @@ -803,7 +817,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
// the transaction can be inserted into the mempool.
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
var (
numTxs = txmp.Size()
numTxs = txmp.SizeWithoutPending()
sizeBytes = txmp.SizeBytes()
)

Expand All @@ -819,6 +833,24 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
return nil
}

func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
var (
numTxs = txmp.PendingSize()
sizeBytes = txmp.PendingSizeBytes()
)

if numTxs >= txmp.config.PendingSize || int64(wtx.Size())+sizeBytes > txmp.config.MaxPendingTxsBytes {
return types.ErrMempoolPendingIsFull{
NumTxs: numTxs,
MaxTxs: txmp.config.PendingSize,
TxsBytes: sizeBytes,
MaxTxsBytes: txmp.config.MaxPendingTxsBytes,
}
}

Check warning on line 849 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L836-L849

Added lines #L836 - L849 were not covered by tests

return nil

Check warning on line 851 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L851

Added line #L851 was not covered by tests
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
if txmp.isInMempool(wtx.tx) {
return false
Expand Down Expand Up @@ -935,13 +967,14 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))

Check warning on line 971 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L971

Added line #L971 was not covered by tests
txmp.expire(blockHeight, wtx)
})
}

func (txmp *TxMempool) notifyTxsAvailable() {
if txmp.Size() == 0 {
if txmp.SizeWithoutPending() == 0 {
return
}

Expand Down Expand Up @@ -980,12 +1013,14 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))

Check warning on line 1016 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L1016

Added line #L1016 was not covered by tests
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
for _, tx := range rejected {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))
if !txmp.config.KeepInvalidTxsInCache {

Check warning on line 1023 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L1022-L1023

Added lines #L1022 - L1023 were not covered by tests
tx.tx.removeHandler(true)
}
}
Expand Down
20 changes: 20 additions & 0 deletions types/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"errors"
"fmt"

tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)

Expand Down Expand Up @@ -84,6 +85,25 @@ func (e ErrMempoolIsFull) Error() string {
)
}

// ErrMempoolPendingIsFull defines an error where there are too many pending transactions
// not processed yet
type ErrMempoolPendingIsFull struct {
NumTxs int
MaxTxs int
TxsBytes int64
MaxTxsBytes int64
}

func (e ErrMempoolPendingIsFull) Error() string {
return fmt.Sprintf(
"mempool pending set is full: number of txs %d (max: %d), total txs bytes %d (max: %d)",
e.NumTxs,
e.MaxTxs,
e.TxsBytes,
e.MaxTxsBytes,
)
}

// ErrPreCheck defines an error where a transaction fails a pre-check.
type ErrPreCheck struct {
Reason error
Expand Down

0 comments on commit b1df7a8

Please sign in to comment.