Skip to content

Commit

Permalink
tweak(zkevm_api): impl concurrency limit on getBatchWitness (erigonte…
Browse files Browse the repository at this point in the history
  • Loading branch information
revitteth authored Aug 14, 2024
1 parent 5736624 commit d32f08c
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ In order to enable the zkevm_ namespace, please add 'zkevm' to the http.api flag
### Supported (remote)
- `zkevm_getBatchByNumber`

### Configurable
- `zkevm_getBatchWitness` - concurrency can be limited with `zkevm.rpc-get-batch-witness-concurrency-limit` flag which defaults to 1. Use 0 for no limit.

### Not yet supported
- `zkevm_getNativeBlockHashesInRange`

Expand Down
34 changes: 33 additions & 1 deletion cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type ZkEvmAPI interface {
GetExitRootTable(ctx context.Context) ([]l1InfoTreeData, error)
}

const getBatchWitness = "getBatchWitness"

// APIImpl is implementation of the ZkEvmAPI interface based on remote Db access
type ZkEvmAPIImpl struct {
ethApi *APIImpl
Expand All @@ -74,6 +76,17 @@ type ZkEvmAPIImpl struct {
config *ethconfig.Config
l1Syncer *syncer.L1Syncer
l2SequencerUrl string
semaphores map[string]chan struct{}
}

func (api *ZkEvmAPIImpl) initializeSemaphores(functionLimits map[string]int) {
api.semaphores = make(map[string]chan struct{})

for funcName, limit := range functionLimits {
if limit != 0 {
api.semaphores[funcName] = make(chan struct{}, limit)
}
}
}

// NewEthAPI returns ZkEvmAPIImpl instance
Expand All @@ -85,14 +98,21 @@ func NewZkEvmAPI(
l1Syncer *syncer.L1Syncer,
l2SequencerUrl string,
) *ZkEvmAPIImpl {
return &ZkEvmAPIImpl{

a := &ZkEvmAPIImpl{
ethApi: base,
db: db,
ReturnDataLimit: returnDataLimit,
config: zkConfig,
l1Syncer: l1Syncer,
l2SequencerUrl: l2SequencerUrl,
}

a.initializeSemaphores(map[string]int{
getBatchWitness: zkConfig.Zk.RpcGetBatchWitnessConcurrencyLimit,
})

return a
}

// ConsolidatedBlockNumber returns the latest consolidated block number
Expand Down Expand Up @@ -836,6 +856,18 @@ func (api *ZkEvmAPIImpl) GetBlockRangeWitness(ctx context.Context, startBlockNrO
}

func (api *ZkEvmAPIImpl) getBatchWitness(ctx context.Context, tx kv.Tx, batchNum uint64, debug bool, mode WitnessMode) (hexutility.Bytes, error) {

// limit in-flight requests by name
semaphore := api.semaphores[getBatchWitness]
if semaphore != nil {
select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
default:
return nil, fmt.Errorf("busy")
}
}

if api.ethApi.historyV3(tx) {
return nil, fmt.Errorf("not supported by Erigon3")
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ var (
Usage: "RPC rate limit in requests per second.",
Value: 0,
}
RpcGetBatchWitnessConcurrencyLimitFlag = cli.IntFlag{
Name: "zkevm.rpc-get-batch-witness-concurrency-limit",
Usage: "The maximum number of concurrent requests to the executor for getBatchWitness.",
Value: 1,
}
DatastreamVersionFlag = cli.IntFlag{
Name: "zkevm.datastream-version",
Usage: "Stream version indicator 1: PreBigEndian, 2: BigEndian.",
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Zk struct {
L1CacheEnabled bool
L1CachePort uint
RpcRateLimits int
RpcGetBatchWitnessConcurrencyLimit int
DatastreamVersion int
SequencerBlockSealTime time.Duration
SequencerBatchSealTime time.Duration
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ var DefaultFlags = []cli.Flag{
&utils.L1MaticContractAddressFlag,
&utils.L1FirstBlockFlag,
&utils.RpcRateLimitsFlag,
&utils.RpcGetBatchWitnessConcurrencyLimitFlag,
&utils.DatastreamVersionFlag,
&utils.RebuildTreeAfterFlag,
&utils.IncrementTreeAlways,
Expand Down
2 changes: 2 additions & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
L1MaticContractAddress: libcommon.HexToAddress(ctx.String(utils.L1MaticContractAddressFlag.Name)),
L1FirstBlock: ctx.Uint64(utils.L1FirstBlockFlag.Name),
RpcRateLimits: ctx.Int(utils.RpcRateLimitsFlag.Name),
RpcGetBatchWitnessConcurrencyLimit: ctx.Int(utils.RpcGetBatchWitnessConcurrencyLimitFlag.Name),
DatastreamVersion: ctx.Int(utils.DatastreamVersionFlag.Name),
RebuildTreeAfter: ctx.Uint64(utils.RebuildTreeAfterFlag.Name),
IncrementTreeAlways: ctx.Bool(utils.IncrementTreeAlways.Name),
Expand Down Expand Up @@ -209,6 +210,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
checkFlag(utils.L1MaticContractAddressFlag.Name, cfg.L1MaticContractAddress.Hex())
checkFlag(utils.L1FirstBlockFlag.Name, cfg.L1FirstBlock)
checkFlag(utils.RpcRateLimitsFlag.Name, cfg.RpcRateLimits)
checkFlag(utils.RpcGetBatchWitnessConcurrencyLimitFlag.Name, cfg.RpcGetBatchWitnessConcurrencyLimit)
checkFlag(utils.RebuildTreeAfterFlag.Name, cfg.RebuildTreeAfter)
checkFlag(utils.L1BlockRangeFlag.Name, cfg.L1BlockRange)
checkFlag(utils.L1QueryDelayFlag.Name, cfg.L1QueryDelay)
Expand Down

0 comments on commit d32f08c

Please sign in to comment.