Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC: Enable back json streaming for non-batch and non-websocket cases #4647

Merged
merged 3 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"net"
"net/http"
"os"
Expand All @@ -15,6 +14,8 @@ import (
"strings"
"time"

"github.com/ledgerwatch/erigon/rpc/rpccfg"

"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
Expand Down Expand Up @@ -79,7 +80,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 2, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, "db.read.concurrency", runtime.GOMAXPROCS(-1), "Does limit amount of parallel db reads")
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "", "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)")
Expand Down Expand Up @@ -439,7 +441,7 @@ func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API)
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)

fmt.Printf("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
Expand Down Expand Up @@ -613,7 +615,7 @@ func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Hand
func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Server, *rpc.Server, string, error) {
engineHttpEndpoint := fmt.Sprintf("%s:%d", cfg.EngineHTTPListenAddress, cfg.EnginePort)

engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, true)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HttpCfg struct {
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
Expand Down
7 changes: 6 additions & 1 deletion cmd/rpcdaemon/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,20 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
// Retrieve the transaction and assemble its EVM context
blockNum, ok, err := api.txnLookup(ctx, tx, hash)
if err != nil {
stream.WriteNil()
return err
}
if !ok {
stream.WriteNil()
return nil
}
block, err := api.blockByNumberWithSenders(tx, blockNum)
if err != nil {
stream.WriteNil()
return err
}
if block == nil {
stream.WriteNil()
return nil
}
blockHash := block.Hash()
Expand All @@ -148,12 +152,13 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
if txn == nil {
var borTx *types.Transaction
borTx, _, _, _, err = rawdb.ReadBorTransaction(tx, hash)

if err != nil {
stream.WriteNil()
return err
}

if borTx != nil {
stream.WriteNil()
return nil
}
stream.WriteNil()
Expand Down
7 changes: 4 additions & 3 deletions cmd/rpcdaemon22/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 2, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, "db.read.concurrency", runtime.GOMAXPROCS(-1), "Does limit amount of parallel db reads")
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "", "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)")
Expand Down Expand Up @@ -455,7 +456,7 @@ func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API)
// register apis and create handler stack
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)

srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
Expand Down Expand Up @@ -629,7 +630,7 @@ func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Hand
func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Server, *rpc.Server, string, error) {
engineHttpEndpoint := fmt.Sprintf("%s:%d", cfg.EngineHTTPListenAddress, cfg.EnginePort)

engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, true)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon22/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type HttpCfg struct {
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
Expand Down
4 changes: 4 additions & 0 deletions cmd/rpcdaemon22/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,20 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
// Retrieve the transaction and assemble its EVM context
blockNum, ok, err := api.txnLookup(ctx, tx, hash)
if err != nil {
stream.WriteNil()
return err
}
if !ok {
stream.WriteNil()
return nil
}
block, err := api.blockByNumberWithSenders(tx, blockNum)
if err != nil {
stream.WriteNil()
return err
}
if block == nil {
stream.WriteNil()
return nil
}
blockHash := block.Hash()
Expand Down
21 changes: 13 additions & 8 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ package utils
import (
"crypto/ecdsa"
"fmt"
"io"
"math/big"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/tabwriter"
"text/template"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/txpool"
Expand All @@ -30,14 +39,6 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/urfave/cli"
"io"
"math/big"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/tabwriter"
"text/template"

"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/params/networkname"
Expand Down Expand Up @@ -363,6 +364,10 @@ var (
Usage: "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request",
Value: 2,
}
RpcStreamingDisableFlag = cli.BoolFlag{
Name: "rpc.streaming.disable",
Usage: "Erigon has enalbed json streamin for some heavy endpoints (like trace_*). It's treadoff: greatly reduce amount of RAM (in some cases from 30GB to 30mb), but it produce invalid json format if error happened in the middle of streaming (because json is not streaming-friendly format)",
}
HTTPTraceFlag = cli.BoolFlag{
Name: "http.trace",
Usage: "Trace HTTP requests with INFO level",
Expand Down
4 changes: 2 additions & 2 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc.
}

// Create RPC server and handler.
srv := rpc.NewServer(50, false /* traceRequests */)
srv := rpc.NewServer(50, false /* traceRequests */, true)
srv.SetAllowList(allowList)
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err
Expand Down Expand Up @@ -298,7 +298,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.All
}

// Create RPC server and handler.
srv := rpc.NewServer(50, false /* traceRequests */)
srv := rpc.NewServer(50, false /* traceRequests */, true)
srv.SetAllowList(allowList)
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (c *Client) dispatch(codec ServerCodec) {
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
conn.handler.handleMsg(op.msgs[0], nil)
}

case err := <-c.readErr:
Expand Down
17 changes: 12 additions & 5 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,26 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}

// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
func (h *handler) handleMsg(msg *jsonrpcMessage, stream *jsoniter.Stream) {
if ok := h.handleImmediate(msg); ok {
return
}
h.startCallProc(func(cp *callProc) {
stream := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096)
needWriteStream := false
if stream == nil {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096)
needWriteStream = true
}
answer := h.handleCallMsg(cp, msg, stream)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.writeJSON(cp.ctx, answer)
} else {
_ = stream.Flush()
buffer, _ := json.Marshal(answer)
stream.Write(json.RawMessage(buffer))
}
if needWriteStream {
h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer()))
} else {
stream.Write([]byte("\n"))
}
for _, n := range cp.notifiers {
n.activate()
Expand Down
7 changes: 6 additions & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/golang-jwt/jwt/v4"
jsoniter "github.com/json-iterator/go"
)

const (
Expand Down Expand Up @@ -222,7 +223,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
s.serveSingleRequest(ctx, codec)
var stream *jsoniter.Stream
if !s.disableStreaming {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, w, 4096)
}
s.serveSingleRequest(ctx, codec, stream)
}

// validateRequest returns a non-zero response code and error message if the
Expand Down
2 changes: 1 addition & 1 deletion rpc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) {
func TestHTTPRespBodyUnlimited(t *testing.T) {
const respLength = maxRequestContentLength * 3

s := NewServer(50, false /* traceRequests */)
s := NewServer(50, false /* traceRequests */, true)
defer s.Stop()
if err := s.RegisterName("test", largeRespService{respLength}); err != nil {
t.Fatal(err)
Expand Down
10 changes: 6 additions & 4 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"

mapset "github.com/deckarep/golang-set"
jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -49,12 +50,13 @@ type Server struct {
codecs mapset.Set

batchConcurrency uint
disableStreaming bool
traceRequests bool // Whether to print requests at INFO level
}

// NewServer creates a new server instance with no registered handlers.
func NewServer(batchConcurrency uint, traceRequests bool) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, traceRequests: traceRequests}
func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, disableStreaming: disableStreaming, traceRequests: traceRequests}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.
rpcService := &RPCService{server: server}
Expand Down Expand Up @@ -100,7 +102,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stream *jsoniter.Stream) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
Expand All @@ -120,7 +122,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
if batch {
h.handleBatch(reqs)
} else {
h.handleMsg(reqs[0])
h.handleMsg(reqs[0], stream)
}
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func TestServerRegisterName(t *testing.T) {
server := NewServer(50, false /* traceRequests */)
server := NewServer(50, false /* traceRequests */, true)
service := new(testService)

if err := server.RegisterName("test", service); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpc/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSubscriptions(t *testing.T) {
subCount = len(namespaces)
notificationCount = 3

server = NewServer(50, false /* traceRequests */)
server = NewServer(50, false /* traceRequests */, true)
clientConn, serverConn = net.Pipe()
out = json.NewEncoder(clientConn)
in = json.NewDecoder(clientConn)
Expand Down
2 changes: 1 addition & 1 deletion rpc/testservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func newTestServer() *Server {
server := NewServer(50, false /* traceRequests */)
server := NewServer(50, false /* traceRequests */, true)
server.idgen = sequentialIDGenerator()
if err := server.RegisterName("test", new(testService)); err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestClientWebsocketPing(t *testing.T) {
// This checks that the websocket transport can deal with large messages.
func TestClientWebsocketLargeMessage(t *testing.T) {
var (
srv = NewServer(50, false /* traceRequests */)
srv = NewServer(50, false /* traceRequests */, true)
httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, nil, false))
wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
)
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var DefaultFlags = []cli.Flag{
utils.HTTPTraceFlag,
utils.StateCacheFlag,
utils.RpcBatchConcurrencyFlag,
utils.RpcStreamingDisableFlag,
utils.DBReadConcurrencyFlag,
utils.RpcAccessListFlag,
utils.RpcTraceCompatFlag,
Expand Down
4 changes: 3 additions & 1 deletion turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package cli

import (
"fmt"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"strings"
"time"

"github.com/ledgerwatch/erigon/rpc/rpccfg"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -344,6 +345,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) {

WebsocketEnabled: ctx.GlobalIsSet(utils.WSEnabledFlag.Name),
RpcBatchConcurrency: ctx.GlobalUint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.GlobalBool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.GlobalInt(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.GlobalString(utils.RpcAccessListFlag.Name),
Gascap: ctx.GlobalUint64(utils.RpcGasCapFlag.Name),
Expand Down