Skip to content

Commit

Permalink
Disable access to pending txs from filters + subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mdehoog committed Aug 22, 2023
1 parent 628d300 commit 67352ce
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var (
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupAllowPendingTxFilters,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

Expand Down
9 changes: 8 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,11 @@ var (
Usage: "By default the pending block equals the latest block to save resources and not leak txs from the tx-pool, this flag enables computing of the pending block from the tx-pool instead.",
Category: flags.RollupCategory,
}
RollupAllowPendingTxFilters = &cli.BoolFlag{
Name: "rollup.allowpendingtxfilters",
Usage: "By default 'eth_subscribe' with 'NewPendingTransaction' and 'eth_newPendingTransactionFilter' are disabled to prevent leaking txs from the tx-pool.",
Category: flags.RollupCategory,
}

// Metrics flags
MetricsEnabledFlag = &cli.BoolFlag{
Expand Down Expand Up @@ -1833,6 +1838,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.RollupHistoricalRPCTimeout = ctx.Duration(RollupHistoricalRPCTimeoutFlag.Name)
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupAllowPendingTxFilters = ctx.Bool(RollupAllowPendingTxFilters.Name)
// Override any default configs for hard coded networks.
switch {
case ctx.Bool(MainnetFlag.Name):
Expand Down Expand Up @@ -2025,7 +2031,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
LogCacheSize: ethcfg.FilterLogCacheSize,
AllowPendingTxs: ethcfg.RollupAllowPendingTxFilters,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Expand Down
9 changes: 5 additions & 4 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ type Config struct {
OverrideOptimismRegolith *uint64 `toml:",omitempty"`
OverrideOptimism *bool

RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupAllowPendingTxFilters bool
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
14 changes: 12 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
// errPendingDisabled is returned from NewPendingTransaction* when access to the mempool is not allowed
errPendingDisabled = errors.New("pending tx filters are disabled")
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -109,7 +111,11 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) {
if !api.sys.cfg.AllowPendingTxs {
return "", errPendingDisabled
}

var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
Expand Down Expand Up @@ -137,13 +143,17 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
}
}()

return pendingTxSub.ID
return pendingTxSub.ID, nil
}

// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
if !api.sys.cfg.AllowPendingTxs {
return &rpc.Subscription{}, errPendingDisabled
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down
5 changes: 3 additions & 2 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ import (

// Config represents the configuration of the filter system.
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
AllowPendingTxs bool // allow filtering or subscriptions to new pending txs
}

func (cfg Config) withDefaults() Config {
Expand Down

0 comments on commit 67352ce

Please sign in to comment.