Skip to content

Commit

Permalink
feat(config): allow configuration of check TX timeout for rpc and p2p…
Browse files Browse the repository at this point in the history
… tx broadcast (#750)

* feat(rpc): timeout-broadcast-tx config setting

* feat(mempool): add timeout-check-tx setting to mempool

* test(mempool): fix nill pointer in test

* chore(config): typo
  • Loading branch information
lklimek authored Mar 12, 2024
1 parent a1497ac commit 66a3ca9
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 18 deletions.
31 changes: 25 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,13 @@ type RPCConfig struct {
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout-broadcast-tx-commit"`

// Timeout of transaction broadcast to mempool; 0 to disable.
//
// This setting affects timeout of CheckTX operations used before
// adding transaction to the mempool. If the operation takes longer,
// the transaction is rejected with an error.
TimeoutBroadcastTx time.Duration `mapstructure:"timeout-broadcast-tx"`

// Maximum size of request body, in bytes
MaxBodyBytes int64 `mapstructure:"max-body-bytes"`

Expand Down Expand Up @@ -564,6 +571,7 @@ func DefaultRPCConfig() *RPCConfig {
EventLogMaxItems: 0,

TimeoutBroadcastTxCommit: 10 * time.Second,
TimeoutBroadcastTx: 0,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
Expand Down Expand Up @@ -602,6 +610,9 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout-broadcast-tx-commit can't be negative")
}
if cfg.TimeoutBroadcastTx < 0 {
return errors.New("timeout-broadcast-tx can't be negative")
}
if cfg.MaxBodyBytes < 0 {
return errors.New("max-body-bytes can't be negative")
}
Expand Down Expand Up @@ -819,6 +830,10 @@ type MempoolConfig struct {
// has existed in the mempool at least TTLNumBlocks number of blocks or if
// it's insertion time into the mempool is beyond TTLDuration.
TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"`

// Timeout of check TX operations received from other nodes.
// Use 0 to disable.
TimeoutCheckTx time.Duration `mapstructure:"timeout-check-tx"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool.
Expand All @@ -827,12 +842,13 @@ func DefaultMempoolConfig() *MempoolConfig {
Broadcast: true,
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
TimeoutCheckTx: 0,
}
}

Expand Down Expand Up @@ -864,6 +880,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.TTLNumBlocks < 0 {
return errors.New("ttl-num-blocks can't be negative")
}
if cfg.TimeoutCheckTx < 0 {
return errors.New("timeout-check-tx can't be negative")
}
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ event-log-max-items = {{ .RPC.EventLogMaxItems }}
# See https://github.com/tendermint/tendermint/issues/3435
timeout-broadcast-tx-commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
# Timeout of transaction broadcast to mempool; 0 to disable.
#
# This setting affects timeout of CheckTX operations used before
# adding transaction to the mempool. If the operation takes longer,
# the transaction is rejected with an error.
timeout-broadcast-tx = "{{ .RPC.TimeoutBroadcastTx }}"
# Maximum size of request body, in bytes
max-body-bytes = {{ .RPC.MaxBodyBytes }}
Expand Down Expand Up @@ -426,6 +433,10 @@ ttl-duration = "{{ .Mempool.TTLDuration }}"
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# Timeout of check TX operations received from other nodes, using p2p protocol.
# Use 0 to disable.
timeout-check-tx = "{{ .Mempool.TimeoutCheckTx }}"
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
23 changes: 14 additions & 9 deletions internal/mempool/p2p_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,32 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/dashpay/tenderdash/config"
"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/internal/p2p/client"
"github.com/dashpay/tenderdash/libs/log"
protomem "github.com/dashpay/tenderdash/proto/tendermint/mempool"
"github.com/dashpay/tenderdash/types"
)

const (
// CheckTxTimeout is the maximum time we wait for CheckTx to return.
// TODO: Change to config option
CheckTxTimeout = 1 * time.Second
)

type (
mempoolP2PMessageHandler struct {
logger log.Logger
config *config.MempoolConfig
checker TxChecker
ids *IDs
}
)

func consumerHandler(logger log.Logger, checker TxChecker, ids *IDs) client.ConsumerParams {
func consumerHandler(logger log.Logger, config *config.MempoolConfig, checker TxChecker, ids *IDs) client.ConsumerParams {
chanIDs := []p2p.ChannelID{p2p.MempoolChannel}
return client.ConsumerParams{
ReadChannels: chanIDs,
Handler: client.HandlerWithMiddlewares(
&mempoolP2PMessageHandler{
logger: logger,
config: config,
checker: checker,
ids: ids,
},
Expand All @@ -60,7 +56,16 @@ func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client,
SenderNodeID: envelope.From,
}
for _, tx := range protoTxs {
subCtx, subCtxCancel := context.WithTimeout(ctx, CheckTxTimeout)
var (
subCtx context.Context
subCtxCancel context.CancelFunc
)
if h.config.TimeoutCheckTx > 0 {
subCtx, subCtxCancel = context.WithTimeout(ctx, h.config.TimeoutCheckTx)
} else {
subCtx, subCtxCancel = context.WithCancel(ctx)
}

defer subCtxCancel()

if err := h.checker.CheckTx(subCtx, tx, nil, txInfo); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/mempool/p2p_msg_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/mock"

abcitypes "github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/config"
"github.com/dashpay/tenderdash/internal/p2p"
tmrequire "github.com/dashpay/tenderdash/internal/test/require"
"github.com/dashpay/tenderdash/libs/log"
Expand All @@ -18,6 +19,7 @@ import (
func TestMempoolP2PMessageHandler(t *testing.T) {
ctx := context.Background()
logger := log.NewTestingLogger(t)
cfg := config.DefaultMempoolConfig()
peerID1 := types.NodeID("peer1")
ids := NewMempoolIDs()
ids.ReserveForPeer(peerID1)
Expand Down Expand Up @@ -69,6 +71,7 @@ func TestMempoolP2PMessageHandler(t *testing.T) {
logger: logger,
checker: mockTxChecker,
ids: ids,
config: cfg,
}
err := hd.Handle(ctx, nil, &tc.envelope)
tmrequire.Error(t, tc.wantErr, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Info("tx broadcasting is disabled")
}
go func() {
err := r.p2pClient.Consume(ctx, consumerHandler(r.logger, r.mempool, r.ids))
err := r.p2pClient.Consume(ctx, consumerHandler(r.logger, r.mempool.config, r.mempool, r.ids))
if err != nil {
r.logger.Error("failed to consume p2p checker messages", "error", err)
}
Expand Down
18 changes: 16 additions & 2 deletions internal/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
// Deprecated and should be removed in 0.37
func (env *Environment) BroadcastTxAsync(_ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) {
go func() {
var (
ctx context.Context
cancel context.CancelFunc
)
// We need to create a new context here, because the original context
// may be canceled after parent function returns.
ctx, cancel := context.WithTimeout(context.Background(), mempool.CheckTxTimeout)
if env.Config.TimeoutBroadcastTx > 0 {
ctx, cancel = context.WithTimeout(context.Background(), env.Config.TimeoutBroadcastTx)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()

if res, err := env.BroadcastTx(ctx, req); err != nil || res.Code != abci.CodeTypeOK {
Expand All @@ -46,7 +54,13 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, req *coretypes.Requ
// DeliverTx result.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func (env *Environment) BroadcastTx(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) {
ctx, cancel := context.WithTimeout(ctx, mempool.CheckTxTimeout)
var cancel context.CancelFunc

if env.Config.TimeoutBroadcastTx > 0 {
ctx, cancel = context.WithTimeout(ctx, env.Config.TimeoutBroadcastTx)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

resCh := make(chan *abci.ResponseCheckTx, 1)
Expand Down

0 comments on commit 66a3ca9

Please sign in to comment.