Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable access to pending txs from filters + subscriptions #118

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var (
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupAllowPendingTxFilters,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

Expand Down
9 changes: 8 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,11 @@ var (
Usage: "By default the pending block equals the latest block to save resources and not leak txs from the tx-pool, this flag enables computing of the pending block from the tx-pool instead.",
Category: flags.RollupCategory,
}
RollupAllowPendingTxFilters = &cli.BoolFlag{
Name: "rollup.allowpendingtxfilters",
Usage: "By default 'eth_subscribe' with 'NewPendingTransaction' and 'eth_newPendingTransactionFilter' are disabled to prevent leaking txs from the tx-pool.",
Category: flags.RollupCategory,
}

// Metrics flags
MetricsEnabledFlag = &cli.BoolFlag{
Expand Down Expand Up @@ -1833,6 +1838,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.RollupHistoricalRPCTimeout = ctx.Duration(RollupHistoricalRPCTimeoutFlag.Name)
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupAllowPendingTxFilters = ctx.Bool(RollupAllowPendingTxFilters.Name)
// Override any default configs for hard coded networks.
switch {
case ctx.Bool(MainnetFlag.Name):
Expand Down Expand Up @@ -2025,7 +2031,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
LogCacheSize: ethcfg.FilterLogCacheSize,
AllowPendingTxs: ethcfg.RollupAllowPendingTxFilters,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Expand Down
9 changes: 5 additions & 4 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ type Config struct {
OverrideOptimismRegolith *uint64 `toml:",omitempty"`
OverrideOptimism *bool

RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupSequencerHTTP string
RollupHistoricalRPC string
RollupHistoricalRPCTimeout time.Duration
RollupDisableTxPoolGossip bool
RollupAllowPendingTxFilters bool
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
14 changes: 12 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
// errPendingDisabled is returned from NewPendingTransaction* when access to the mempool is not allowed
errPendingDisabled = errors.New("pending tx filters are disabled")
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -109,7 +111,11 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) {
trianglesphere marked this conversation as resolved.
Show resolved Hide resolved
if !api.sys.cfg.AllowPendingTxs {
return "", errPendingDisabled
}

var (
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
Expand Down Expand Up @@ -137,13 +143,17 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
}
}()

return pendingTxSub.ID
return pendingTxSub.ID, nil
}

// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
if !api.sys.cfg.AllowPendingTxs {
return &rpc.Subscription{}, errPendingDisabled
}

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down
2 changes: 2 additions & 0 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
// allow filtering or subscriptions to new pending txs:
AllowPendingTxs bool
}

func (cfg Config) withDefaults() Config {
Expand Down
16 changes: 13 additions & 3 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc

func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db}
cfg.AllowPendingTxs = true
sys := NewFilterSystem(backend, cfg)
return backend, sys
}
Expand Down Expand Up @@ -263,7 +264,10 @@ func TestPendingTxFilter(t *testing.T) {
hashes []common.Hash
)

fid0 := api.NewPendingTransactionFilter(nil)
fid0, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -320,7 +324,10 @@ func TestPendingTxFilterFullTx(t *testing.T) {
)

fullTx := true
fid0 := api.NewPendingTransactionFilter(&fullTx)
fid0, err := api.NewPendingTransactionFilter(&fullTx)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}

time.Sleep(1 * time.Second)
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
Expand Down Expand Up @@ -915,7 +922,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
// timeout either in 100ms or 200ms
fids := make([]rpc.ID, 20)
for i := 0; i < len(fids); i++ {
fid := api.NewPendingTransactionFilter(nil)
fid, err := api.NewPendingTransactionFilter(nil)
if err != nil {
t.Fatalf("Unable to create filter: %v", err)
}
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
Expand Down