Skip to content

Commit

Permalink
Hotfix v0.1.4 to main (#2250) (#2253)
Browse files Browse the repository at this point in the history
* fix concurrent web socket writes

* fix eth_syncing

* fix custom trace internal tx call error handling and update prover

* add test to custom tracer depth issue; fix internal call error and gas used

* fix custom tracer for internal tx with error and no more steps after it

* remove debug code

* Make max grpc message size configurable  (#2179)

* make max grpc message size configurable

* fix state tests

* fix tests

* fix tests

* get SequencerNodeURI from SC if empty and not IsTrustedSequencer

* Optimize trace (#2183)

* optimize trace

* fix memory reading

* update docker image

* update prover image

* fix converter

* fix memory

* fix step memory

* fix structlogs

* fix structlogs

* fix structlogs

* fix structlogs

* fix structlogs

* fix structlogs

* fix structlogs

* fix structlogs

* update prover image

* fix struclogs

* fix memory size

* fix memory size

* fix memory size

* refactor memory resize

* refactor memory resize

* move log for the best fitting tx (#2192)

* fix load zkCounters from pool

* remove unnecessary log.info

* add custom tracer support to CREATES opcode without depth increase (#2213)

* logs

* fix getting stateroot from previous batch (GetWIPBatch)

* logs

* Fix GetWipBatch when previous last batch is a forced batch

* fix forcedBatch trusted state

* Revert "fix getting stateroot from previous batch (GetWIPBatch)"

This reverts commit 860f0e7.

* force GHA

* add pool limits (#2189)

* Hotfix/batch l2 data (#2223)

* Fix BatchL2Data

* Force GHA

* remove failed txs from the pool limit check (#2233)

* debug trace by batch number via external rpc requests (#2235)

* fix trace batch remote requests in parallel limitation (#2244)

* Added RPC.TraceBatchUseHTTPS config parameter

* fix executor version

---------

Co-authored-by: tclemos <[email protected]>
Co-authored-by: tclemos <[email protected]>
Co-authored-by: Toni Ramírez <[email protected]>
Co-authored-by: agnusmor <[email protected]>
Co-authored-by: agnusmor <[email protected]>
Co-authored-by: Thiago Coimbra Lemos <[email protected]>
  • Loading branch information
7 people authored Jul 5, 2023
1 parent cf20a24 commit bc9395e
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 14 deletions.
6 changes: 3 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,14 @@ func runJSONRPCServer(c config.Config, etherman *etherman.Client, chainID uint64
if _, ok := apis[jsonrpc.APINet]; ok {
services = append(services, jsonrpc.Service{
Name: jsonrpc.APINet,
Service: jsonrpc.NewNetEndpoints(chainID),
Service: jsonrpc.NewNetEndpoints(c.RPC, chainID),
})
}

if _, ok := apis[jsonrpc.APIZKEVM]; ok {
services = append(services, jsonrpc.Service{
Name: jsonrpc.APIZKEVM,
Service: jsonrpc.NewZKEVMEndpoints(st),
Service: jsonrpc.NewZKEVMEndpoints(c.RPC, st),
})
}

Expand All @@ -362,7 +362,7 @@ func runJSONRPCServer(c config.Config, etherman *etherman.Client, chainID uint64
if _, ok := apis[jsonrpc.APIDebug]; ok {
services = append(services, jsonrpc.Service{
Name: jsonrpc.APIDebug,
Service: jsonrpc.NewDebugEndpoints(st),
Service: jsonrpc.NewDebugEndpoints(c.RPC, st),
})
}

Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ WriteTimeout = "60s"
MaxRequestsPerIPAndSecond = 500
SequencerNodeURI = ""
EnableL2SuggestedGasPricePolling = true
TraceBatchUseHTTPS = true
[RPC.WebSockets]
Enabled = true
Host = "0.0.0.0"
Expand Down
4 changes: 4 additions & 0 deletions jsonrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Config struct {

// EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price.
EnableL2SuggestedGasPricePolling bool `mapstructure:"EnableL2SuggestedGasPricePolling"`

// TraceBatchUseHTTPS enables, in the debug_traceBatchByNum endpoint, the use of the HTTPS protocol (instead of HTTP)
// to do the parallel requests to RPC.debug_traceTransaction endpoint
TraceBatchUseHTTPS bool `mapstructure:"TraceBatchUseHTTPS"`
}

// WebSocketsConfig has parameters to config the rpc websocket support
Expand Down
222 changes: 219 additions & 3 deletions jsonrpc/endpoints_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"

"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
Expand All @@ -28,13 +34,15 @@ var defaultTraceConfig = &traceConfig{

// DebugEndpoints is the debug jsonrpc endpoint
type DebugEndpoints struct {
cfg Config
state types.StateInterface
txMan DBTxManager
}

// NewDebugEndpoints returns DebugEndpoints
func NewDebugEndpoints(state types.StateInterface) *DebugEndpoints {
func NewDebugEndpoints(cfg Config, state types.StateInterface) *DebugEndpoints {
return &DebugEndpoints{
cfg: cfg,
state: state,
}
}
Expand Down Expand Up @@ -73,6 +81,11 @@ type traceBlockTransactionResponse struct {
Result interface{} `json:"result"`
}

type traceBatchTransactionResponse struct {
TxHash common.Hash `json:"txHash"`
Result interface{} `json:"result"`
}

// TraceTransaction creates a response for debug_traceTransaction request.
// See https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-debug#debugtracetransaction
func (d *DebugEndpoints) TraceTransaction(hash types.ArgHash, cfg *traceConfig) (interface{}, types.Error) {
Expand All @@ -93,7 +106,7 @@ func (d *DebugEndpoints) TraceBlockByNumber(number types.BlockNumber, cfg *trace
block, err := d.state.GetL2BlockByNumber(ctx, blockNumber, dbTx)
if errors.Is(err, state.ErrNotFound) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block #%d not found", blockNumber))
} else if err == state.ErrNotFound {
} else if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to get block by number", err)
}

Expand All @@ -113,7 +126,7 @@ func (d *DebugEndpoints) TraceBlockByHash(hash types.ArgHash, cfg *traceConfig)
block, err := d.state.GetL2BlockByHash(ctx, hash.Hash(), dbTx)
if errors.Is(err, state.ErrNotFound) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block %s not found", hash.Hash().String()))
} else if err == state.ErrNotFound {
} else if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to get block by hash", err)
}

Expand All @@ -126,6 +139,159 @@ func (d *DebugEndpoints) TraceBlockByHash(hash types.ArgHash, cfg *traceConfig)
})
}

// TraceBatchByNumber creates a response for debug_traceBatchByNumber request.
// this endpoint tries to help clients to get traces at once for all the transactions
// attached to the same batch.
//
// IMPORTANT: in order to take advantage of the infrastructure automatically scaling,
// instead of parallelizing the trace transaction internally and pushing all the load
// to a single jRPC and Executor instance, the code will redirect the trace transaction
// requests to the same url, making them external calls, so we can process in parallel
// with multiple jRPC and Executor instances.
//
// the request flow will work as follows:
// -> user do a trace batch request
// -> jRPC balancer picks a jRPC server to handle the trace batch request
// -> picked jRPC sends parallel trace transaction requests for each transaction in the batch
// -> jRPC balancer sends each request to a different jRPC to handle the trace transaction requests
// -> picked jRPC server group trace transaction responses from other jRPC servers
// -> picked jRPC respond the initial request to the user with all the tx traces
func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number types.BatchNumber, cfg *traceConfig) (interface{}, types.Error) {
type traceResponse struct {
blockNumber uint64
txIndex uint64
txHash common.Hash
trace interface{}
err error
}

// the size of the buffer defines
// how many txs it will process in parallel.
const bufferSize = 10

// checks and load the request scheme to build the url for the remote requests
// scheme, err := getHttpScheme(httpRequest)
// if err != nil {
// return RPCErrorResponse(types.DefaultErrorCode, err.Error(), nil)
// }

// builds the url of the remote jRPC server
scheme := "http"
if d.cfg.TraceBatchUseHTTPS {
scheme = "https"
}
u := url.URL{
Scheme: scheme,
Host: httpRequest.Host,
Path: httpRequest.URL.Path,
}
rpcURL := u.String()

return d.txMan.NewDbTxScope(d.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) {
batchNumber, rpcErr := number.GetNumericBatchNumber(ctx, d.state, dbTx)
if rpcErr != nil {
return nil, rpcErr
}

batch, err := d.state.GetBatchByNumber(ctx, batchNumber, dbTx)
if errors.Is(err, state.ErrStateNotSynchronized) {
return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("batch #%d not found", batchNumber))
} else if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to get batch by number", err)
}

txs, _, err := d.state.GetTransactionsByBatchNumber(ctx, batch.BatchNumber, dbTx)
if !errors.Is(err, state.ErrNotFound) && err != nil {
return RPCErrorResponse(types.DefaultErrorCode, fmt.Sprintf("couldn't load batch txs from state by number %v to create the traces", batchNumber), err)
}

receipts := make([]ethTypes.Receipt, 0, len(txs))
for _, tx := range txs {
receipt, err := d.state.GetTransactionReceipt(ctx, tx.Hash(), dbTx)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, fmt.Sprintf("couldn't load receipt for tx %v to get trace", tx.Hash().String()), err)
}
receipts = append(receipts, *receipt)
}

buffer := make(chan byte, bufferSize)

mu := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(receipts))
responses := make([]traceResponse, 0, len(receipts))

// gets the trace from the jRPC and adds it to the responses
loadTraceByTxHash := func(receipt ethTypes.Receipt) {
defer func() {
<-buffer // make buffer slot free
wg.Done()
}()
buffer <- 1 // use buffer free slot or wait for a free slot

response := traceResponse{
blockNumber: receipt.BlockNumber.Uint64(),
txIndex: uint64(receipt.TransactionIndex),
txHash: receipt.TxHash,
}

res, err := client.JSONRPCCall(rpcURL, "debug_traceTransaction", receipt.TxHash.String(), cfg)
if err != nil {
err := fmt.Errorf("failed to get tx trace from remote jRPC server %v for tx %v, err: %w", rpcURL, receipt.TxHash.String(), err)
log.Errorf(err.Error())
response.err = err
} else if res.Error != nil {
err := fmt.Errorf("tx trace error returned from remote jRPC server %v for tx %v, err: %v - %v", rpcURL, receipt.TxHash.String(), res.Error.Code, res.Error.Message)
log.Errorf(err.Error())
response.err = err
} else {
response.trace = res.Result
}

// add to the responses
mu.Lock()
defer mu.Unlock()
responses = append(responses, response)
}

// load traces for each transaction
for _, receipt := range receipts {
go loadTraceByTxHash(receipt)
}

// wait the traces to be loaded
if waitTimeout(&wg, d.cfg.ReadTimeout.Duration) {
return RPCErrorResponse(types.DefaultErrorCode, fmt.Sprintf("failed to get traces for batch %v: timeout reached", batchNumber), nil)
}

// since the txs are attached to a L2 Block and the L2 Block is
// the struct attached to the Batch, in order to always respond
// the traces in the same order, we need to order the transactions
// first by block number and then by tx index, so we can have something
// close to the txs being sorted by a tx index related to the batch
sort.Slice(responses, func(i, j int) bool {
if responses[i].txIndex != responses[j].txIndex {
return responses[i].txIndex < responses[j].txIndex
}
return responses[i].blockNumber < responses[j].blockNumber
})

// build the batch trace response array
traces := make([]traceBatchTransactionResponse, 0, len(receipts))
for _, response := range responses {
if response.err != nil {
return RPCErrorResponse(types.DefaultErrorCode, fmt.Sprintf("failed to get traces for batch %v: failed to get trace for tx: %v, err: %v", batchNumber, response.txHash.String(), response.err.Error()), nil)
}

traces = append(traces, traceBatchTransactionResponse{
TxHash: response.txHash,
Result: response.trace,
})
}
return traces, nil
})
}

func (d *DebugEndpoints) buildTraceBlock(ctx context.Context, txs []*ethTypes.Transaction, cfg *traceConfig, dbTx pgx.Tx) (interface{}, types.Error) {
traces := []traceBlockTransactionResponse{}
for _, tx := range txs {
Expand Down Expand Up @@ -300,3 +466,53 @@ func isBuiltInTracer(tracer string) bool {
func isJSCustomTracer(tracer string) bool {
return strings.Contains(tracer, "result") && strings.Contains(tracer, "fault")
}

// // getHttpScheme tries to get the scheme from the http request in different ways
// func getHttpScheme(r *http.Request) (string, error) {
// // scheme headers
// headers := []string{"X-Forwarded-Proto", "X-Forwarded-Protocol", "X-Url-Scheme"}
// for _, header := range headers {
// value := r.Header.Get(header)
// if value == "http" || value == "https" {
// return value, nil
// } else if value != "" {
// return "", fmt.Errorf("header %v must be set to HTTP or HTTPS, value found: %s", header, value)
// }
// }

// // https on/off headers
// headers = []string{"X-Forwarded-Ssl", "Front-End-Https"}
// for _, header := range headers {
// value := r.Header.Get(header)
// if value == "on" {
// return "https", nil
// } else if value == "off" {
// return "http", nil
// } else if value != "" {
// return "", fmt.Errorf("header %v must be set to ON or OFF, value found: %s", header, value)
// }
// }

// // httpRequest TLS check
// scheme := "http"
// if r.TLS != nil {
// scheme = "https"
// }
// return scheme, nil
// }

// waitTimeout waits for the waitGroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
2 changes: 1 addition & 1 deletion jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const (

// EthEndpoints contains implementations for the "eth" RPC endpoints
type EthEndpoints struct {
cfg Config
chainID uint64
cfg Config
pool types.PoolInterface
state types.StateInterface
storage storageInterface
Expand Down
4 changes: 3 additions & 1 deletion jsonrpc/endpoints_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

// NetEndpoints contains implementations for the "net" RPC endpoints
type NetEndpoints struct {
cfg Config
chainID uint64
}

// NewNetEndpoints returns NetEndpoints
func NewNetEndpoints(chainID uint64) *NetEndpoints {
func NewNetEndpoints(cfg Config, chainID uint64) *NetEndpoints {
return &NetEndpoints{
cfg: cfg,
chainID: chainID,
}
}
Expand Down
4 changes: 3 additions & 1 deletion jsonrpc/endpoints_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (

// ZKEVMEndpoints contains implementations for the "zkevm" RPC endpoints
type ZKEVMEndpoints struct {
cfg Config
state types.StateInterface
txMan DBTxManager
}

// NewZKEVMEndpoints returns ZKEVMEndpoints
func NewZKEVMEndpoints(state types.StateInterface) *ZKEVMEndpoints {
func NewZKEVMEndpoints(cfg Config, state types.StateInterface) *ZKEVMEndpoints {
return &ZKEVMEndpoints{
cfg: cfg,
state: state,
}
}
Expand Down
6 changes: 3 additions & 3 deletions jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e
if _, ok := apis[APINet]; ok {
services = append(services, Service{
Name: APINet,
Service: NewNetEndpoints(chainID),
Service: NewNetEndpoints(cfg, chainID),
})
}

if _, ok := apis[APIZKEVM]; ok {
services = append(services, Service{
Name: APIZKEVM,
Service: NewZKEVMEndpoints(st),
Service: NewZKEVMEndpoints(cfg, st),
})
}

Expand All @@ -83,7 +83,7 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e
if _, ok := apis[APIDebug]; ok {
services = append(services, Service{
Name: APIDebug,
Service: NewDebugEndpoints(st),
Service: NewDebugEndpoints(cfg, st),
})
}

Expand Down
Loading

0 comments on commit bc9395e

Please sign in to comment.