diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 6cf2fd57ff..2df6c63b55 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -146,6 +146,7 @@ var ( utils.RollupHistoricalRPCTimeoutFlag, utils.RollupDisableTxPoolGossipFlag, utils.RollupComputePendingBlock, + utils.RollupAllowPendingTxFilters, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b8988eb323..a0752054b0 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -888,6 +888,11 @@ var ( Usage: "By default the pending block equals the latest block to save resources and not leak txs from the tx-pool, this flag enables computing of the pending block from the tx-pool instead.", Category: flags.RollupCategory, } + RollupAllowPendingTxFilters = &cli.BoolFlag{ + Name: "rollup.allowpendingtxfilters", + Usage: "By default 'eth_subscribe' with 'NewPendingTransaction' and 'eth_newPendingTransactionFilter' are disabled to prevent leaking txs from the tx-pool.", + Category: flags.RollupCategory, + } // Metrics flags MetricsEnabledFlag = &cli.BoolFlag{ @@ -1833,6 +1838,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.RollupHistoricalRPCTimeout = ctx.Duration(RollupHistoricalRPCTimeoutFlag.Name) } cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name) + cfg.RollupAllowPendingTxFilters = ctx.Bool(RollupAllowPendingTxFilters.Name) // Override any default configs for hard coded networks. switch { case ctx.Bool(MainnetFlag.Name): @@ -2025,7 +2031,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem { isLightClient := ethcfg.SyncMode == downloader.LightSync filterSystem := filters.NewFilterSystem(backend, filters.Config{ - LogCacheSize: ethcfg.FilterLogCacheSize, + LogCacheSize: ethcfg.FilterLogCacheSize, + AllowPendingTxs: ethcfg.RollupAllowPendingTxFilters, }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index fcf5eae4d5..6becf71d1f 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -168,10 +168,11 @@ type Config struct { OverrideOptimismRegolith *uint64 `toml:",omitempty"` OverrideOptimism *bool - RollupSequencerHTTP string - RollupHistoricalRPC string - RollupHistoricalRPCTimeout time.Duration - RollupDisableTxPoolGossip bool + RollupSequencerHTTP string + RollupHistoricalRPC string + RollupHistoricalRPCTimeout time.Duration + RollupDisableTxPoolGossip bool + RollupAllowPendingTxFilters bool } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/eth/filters/api.go b/eth/filters/api.go index cc08b442e8..c00c02273d 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -36,6 +36,8 @@ import ( var ( errInvalidTopic = errors.New("invalid topic(s)") errFilterNotFound = errors.New("filter not found") + // errPendingDisabled is returned from NewPendingTransaction* when access to the mempool is not allowed + errPendingDisabled = errors.New("pending tx filters are disabled") ) // filter is a helper struct that holds meta information over the filter type @@ -109,7 +111,11 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { // // It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. -func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { +func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) { + if !api.sys.cfg.AllowPendingTxs { + return "", errPendingDisabled + } + var ( pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) @@ -137,13 +143,17 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { } }() - return pendingTxSub.ID + return pendingTxSub.ID, nil } // NewPendingTransactions creates a subscription that is triggered each time a // transaction enters the transaction pool. If fullTx is true the full tx is // sent to the client, otherwise the hash is sent. func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { + if !api.sys.cfg.AllowPendingTxs { + return &rpc.Subscription{}, errPendingDisabled + } + notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 1768681c17..4e833e2556 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -44,6 +44,8 @@ import ( type Config struct { LogCacheSize int // maximum number of cached blocks (default: 32) Timeout time.Duration // how long filters stay active (default: 5min) + // allow filtering or subscriptions to new pending txs: + AllowPendingTxs bool } func (cfg Config) withDefaults() Config { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index ae0c83eb29..97fd9752b6 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -180,6 +180,7 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} + cfg.AllowPendingTxs = true sys := NewFilterSystem(backend, cfg) return backend, sys } @@ -263,7 +264,10 @@ func TestPendingTxFilter(t *testing.T) { hashes []common.Hash ) - fid0 := api.NewPendingTransactionFilter(nil) + fid0, err := api.NewPendingTransactionFilter(nil) + if err != nil { + t.Fatalf("Unable to create filter: %v", err) + } time.Sleep(1 * time.Second) backend.txFeed.Send(core.NewTxsEvent{Txs: transactions}) @@ -320,7 +324,10 @@ func TestPendingTxFilterFullTx(t *testing.T) { ) fullTx := true - fid0 := api.NewPendingTransactionFilter(&fullTx) + fid0, err := api.NewPendingTransactionFilter(&fullTx) + if err != nil { + t.Fatalf("Unable to create filter: %v", err) + } time.Sleep(1 * time.Second) backend.txFeed.Send(core.NewTxsEvent{Txs: transactions}) @@ -915,7 +922,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) { // timeout either in 100ms or 200ms fids := make([]rpc.ID, 20) for i := 0; i < len(fids); i++ { - fid := api.NewPendingTransactionFilter(nil) + fid, err := api.NewPendingTransactionFilter(nil) + if err != nil { + t.Fatalf("Unable to create filter: %v", err) + } fids[i] = fid // Wait for at least one tx to arrive in filter for {