Skip to content

Commit

Permalink
rpc: add limit for batch request items and response size (#26681)
Browse files Browse the repository at this point in the history
This PR adds server-side limits for JSON-RPC batch requests. Before this change, batches
were limited only by processing time. The server would pick calls from the batch and
answer them until the response timeout occurred, then stop processing the remaining batch
items.

Here, we are adding two additional limits which can be configured:

- the 'item limit': batches can have at most N items
- the 'response size limit': batches can contain at most X response bytes

These limits are optional in package rpc. In Geth, we set a default limit of 1000 items
and 25MB response size.

When a batch goes over the limit, an error response is returned to the client. However,
doing this correctly isn't always possible. In JSON-RPC, only method calls with a valid
`id` can be responded to. Since batches may also contain non-call messages or
notifications, the best effort thing we can do to report an error with the batch itself is
reporting the limit violation as an error for the first method call in the batch. If a batch is
too large, but contains only notifications and responses, the error will be reported with
a null `id`.

The RPC client was also changed so it can deal with errors resulting from too large
batches. An older client connected to the server code in this PR could get stuck
until the request timeout occurred when the batch is too large. **Upgrading to a version
of the RPC client containing this change is strongly recommended to avoid timeout issues.**

For some weird reason, when writing the original client implementation, @fjl worked off of
the assumption that responses could be distributed across batches arbitrarily. So for a
batch request containing requests `[A B C]`, the server could respond with `[A B C]` but
also with `[A B] [C]` or even `[A] [B] [C]` and it wouldn't make a difference to the
client.

So in the implementation of BatchCallContext, the client waited for all requests in the
batch individually. If the server didn't respond to some of the requests in the batch, the
client would eventually just time out (if a context was used).

With the addition of batch limits into the server, we anticipate that people will hit this
kind of error way more often. To handle this properly, the client now waits for a single
response batch and expects it to contain all responses to the requests.

---------

Co-authored-by: Felix Lange <[email protected]>
Co-authored-by: Martin Holst Swende <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2023
1 parent 5ac4da3 commit f3314bb
Show file tree
Hide file tree
Showing 22 changed files with 554 additions and 226 deletions.
1 change: 1 addition & 0 deletions cmd/clef/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ func signer(c *cli.Context) error {
cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name))

srv := rpc.NewServer()
srv.SetBatchLimits(node.DefaultConfig.BatchRequestLimit, node.DefaultConfig.BatchResponseMaxSize)
err := node.RegisterApis(rpcAPI, []string{"account"}, srv)
if err != nil {
utils.Fatalf("Could not register API: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ var (
utils.RPCGlobalEVMTimeoutFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
}

metricsFlags = []cli.Flag{
Expand Down
20 changes: 20 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,18 @@ var (
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
Category: flags.APICategory,
}
BatchRequestLimit = &cli.IntFlag{
Name: "rpc.batch-request-limit",
Usage: "Maximum number of requests in a batch",
Value: node.DefaultConfig.BatchRequestLimit,
Category: flags.APICategory,
}
BatchResponseMaxSize = &cli.IntFlag{
Name: "rpc.batch-response-max-size",
Usage: "Maximum number of bytes returned from a batched call",
Value: node.DefaultConfig.BatchResponseMaxSize,
Category: flags.APICategory,
}
EnablePersonal = &cli.BoolFlag{
Name: "rpc.enabledeprecatedpersonal",
Usage: "Enables the (deprecated) personal namespace",
Expand Down Expand Up @@ -1130,6 +1142,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.IsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name)
}

if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

if ctx.IsSet(BatchResponseMaxSize.Name) {
cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name)
}
}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
8 changes: 8 additions & 0 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri
CorsAllowedOrigins: api.node.config.HTTPCors,
Vhosts: api.node.config.HTTPVirtualHosts,
Modules: api.node.config.HTTPModules,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if cors != nil {
config.CorsAllowedOrigins = nil
Expand Down Expand Up @@ -250,6 +254,10 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap
Modules: api.node.config.WSModules,
Origins: api.node.config.WSOrigins,
// ExposeAll: api.node.config.WSExposeAll,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if apis != nil {
config.Modules = nil
Expand Down
6 changes: 6 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ type Config struct {
// AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC.
AllowUnprotectedTxs bool `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

// BatchResponseMaxSize is the maximum number of bytes returned from a batched rpc call.
BatchResponseMaxSize int `toml:",omitempty"`

// JWTSecret is the path to the hex-encoded jwt secret.
JWTSecret string `toml:",omitempty"`

Expand Down
24 changes: 13 additions & 11 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,19 @@ var (

// DefaultConfig contains reasonable default settings.
var DefaultConfig = Config{
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
AuthAddr: DefaultAuthHost,
AuthPort: DefaultAuthPort,
AuthVirtualHosts: DefaultAuthVhosts,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
GraphQLVirtualHosts: []string{"localhost"},
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
AuthAddr: DefaultAuthHost,
AuthPort: DefaultAuthPort,
AuthVirtualHosts: DefaultAuthVhosts,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
BatchRequestLimit: 1000,
BatchResponseMaxSize: 25 * 1000 * 1000,
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
MaxPeers: 50,
Expand Down
31 changes: 21 additions & 10 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ func New(conf *Config) (*Node, error) {
if strings.HasSuffix(conf.Name, ".ipc") {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -403,6 +404,11 @@ func (n *Node) startRPC() error {
openAPIs, allAPIs = n.getAPIs()
)

rpcConfig := rpcEndpointConfig{
batchItemLimit: n.config.BatchRequestLimit,
batchResponseSizeLimit: n.config.BatchResponseMaxSize,
}

initHttp := func(server *httpServer, port int) error {
if err := server.setListenAddr(n.config.HTTPHost, port); err != nil {
return err
Expand All @@ -412,6 +418,7 @@ func (n *Node) startRPC() error {
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
rpcEndpointConfig: rpcConfig,
}); err != nil {
return err
}
Expand All @@ -425,9 +432,10 @@ func (n *Node) startRPC() error {
return err
}
if err := server.enableWS(openAPIs, wsConfig{
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
rpcEndpointConfig: rpcConfig,
}); err != nil {
return err
}
Expand All @@ -441,26 +449,29 @@ func (n *Node) startRPC() error {
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
sharedConfig := rpcConfig
sharedConfig.jwtSecret = secret
if err := server.enableRPC(allAPIs, httpConfig{
CorsAllowedOrigins: DefaultAuthCors,
Vhosts: n.config.AuthVirtualHosts,
Modules: DefaultAuthModules,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
servers = append(servers, server)

// Enable auth via WS
server = n.wsServerForPort(port, true)
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
if err := server.enableWS(allAPIs, wsConfig{
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,21 @@ type httpConfig struct {
CorsAllowedOrigins []string
Vhosts []string
prefix string // path prefix on which to mount http handler
jwtSecret []byte // optional JWT secret
rpcEndpointConfig
}

// wsConfig is the JSON-RPC/Websocket configuration
type wsConfig struct {
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
jwtSecret []byte // optional JWT secret
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
rpcEndpointConfig
}

type rpcEndpointConfig struct {
jwtSecret []byte // optional JWT secret
batchItemLimit int
batchResponseSizeLimit int
}

type rpcHandler struct {
Expand Down Expand Up @@ -297,6 +303,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand Down Expand Up @@ -328,6 +335,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
}
// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions node/rpcstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,10 @@ func TestJWT(t *testing.T) {
ss, _ := jwt.NewWithClaims(method, testClaim(input)).SignedString(secret)
return ss
}
srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")},
true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")}, nil)
cfg := rpcEndpointConfig{jwtSecret: []byte("secret")}
httpcfg := &httpConfig{rpcEndpointConfig: cfg}
wscfg := &wsConfig{Origins: []string{"*"}, rpcEndpointConfig: cfg}
srv := createAndStartServer(t, httpcfg, true, wscfg, nil)
wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr())
htUrl := fmt.Sprintf("http://%v", srv.listenAddr())

Expand Down
Loading

0 comments on commit f3314bb

Please sign in to comment.