Skip to content

Commit

Permalink
feat(op-node): support multi clients to fetch blobs (#199)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <[email protected]>
  • Loading branch information
bnoieh and owen-reorg authored May 30, 2024
1 parent 3f74a4b commit 00d3473
Show file tree
Hide file tree
Showing 23 changed files with 460 additions and 120 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ jobs:
version: latest
args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is"

op-service-lint:
runs-on: ubuntu-latest

steps:
- name: Check out code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod

- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
working-directory: op-service
version: latest
args: -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is"

op-node-test:
runs-on: ubuntu-latest
needs: op-node-lint
Expand Down Expand Up @@ -151,6 +170,35 @@ jobs:
with:
report_paths: '/tmp/test-results/op-proposer.xml'

op-service-test:
runs-on: ubuntu-latest
needs: op-service-lint

steps:
- name: Check out code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod

- name: Install gotestsum
uses: autero1/[email protected]
with:
gotestsum_version: 1.10.0

- name: Run tests
working-directory: op-service
run: |
gotestsum --format=testname --junitfile=/tmp/test-results/op-service.xml -- -parallel=2 -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out ./...
- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: success() || failure() # always run even if the previous step fails
with:
report_paths: '/tmp/test-results/op-service.xml'

op-e2e-http-test:
runs-on: ubuntu-latest
needs: [op-node-test, op-batcher-test, op-proposer-test]
Expand Down
3 changes: 2 additions & 1 deletion op-e2e/actions/fallback_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger,
})
l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
l1Blob := sources.NewBSCBlobClient([]client.RPC{rpc})
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

sequencer := NewL2Sequencer(t, log, l1F, l1F, plasma.Disabled, l2Cl, sd.RollupCfg, 0)
sequencer := NewL2Sequencer(t, log, l1F, l1Blob, plasma.Disabled, l2Cl, sd.RollupCfg, 0)
return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient)
}

Expand Down
1 change: 1 addition & 0 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
configureL2(nodeCfg, sys.EthInstances[name], cfg.JWTSecret)
if sys.RollupConfig.EcotoneTime != nil {
nodeCfg.Beacon = &rollupNode.L1BeaconEndpointConfig{BeaconAddr: sys.L1BeaconAPIAddr}
nodeCfg.L1Blob = &rollupNode.L1BlobEndpointConfig{NodeAddrs: sys.NodeEndpoint("l1")}
}
}

Expand Down
24 changes: 24 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ var (
Value: time.Second * 3,
Category: L1RPCCategory,
}
L1ArchiveBlobRpcAddr = &cli.StringFlag{
Name: "l1.archive-blob-rpc",
Usage: "Optional address of L1 archive blob endpoint to use. Multiple alternative addresses are supported, separated by commas, and will rotate when error",
Required: false,
EnvVars: prefixEnvVars("L1_ARCHIVE_BLOB_RPC"),
Category: RollupCategory,
}
L1BlobRpcRateLimit = &cli.Float64Flag{
Name: "l1.blob-rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 blob RPC requests, specified in requests / second. Disabled if set to 0.",
EnvVars: prefixEnvVars("L1_BLOB_RPC_RATE_LIMIT"),
Value: 0,
Category: L1RPCCategory,
}
L1BlobRpcMaxBatchSize = &cli.IntFlag{
Name: "l1.blob-rpc-max-batch-size",
Usage: "Optional maximum number of L1 blob RPC requests to bundle",
EnvVars: prefixEnvVars("L1_BLOB_RPC_MAX_BATCH_SIZE"),
Value: 20,
Category: L1RPCCategory,
}
VerifierL1Confs = &cli.Uint64Flag{
Name: "verifier.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.",
Expand Down Expand Up @@ -382,6 +403,9 @@ var optionalFlags = []cli.Flag{
L1RPCMaxBatchSize,
L1RPCMaxConcurrency,
L1HTTPPollInterval,
L1ArchiveBlobRpcAddr,
L1BlobRpcRateLimit,
L1BlobRpcMaxBatchSize,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerStoppedFlag,
Expand Down
64 changes: 62 additions & 2 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
service_client "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"

"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -40,6 +39,11 @@ type L1BeaconEndpointSetup interface {
Check() error
}

type L1BlobEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) ([]client.RPC, error)
Check() error
}

type L2EndpointConfig struct {
// L2EngineAddr is the address of the L2 Engine JSON-RPC endpoint to use. The engine and eth
// namespaces must be enabled by the endpoint.
Expand Down Expand Up @@ -147,7 +151,7 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}

isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr)
isMultiUrl, urlList := client.MultiUrlParse(cfg.L1NodeAddr)
if isMultiUrl {
return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...)
}
Expand Down Expand Up @@ -249,3 +253,59 @@ func parseHTTPHeader(headerStr string) (http.Header, error) {
h.Add(s[0], s[1])
return h, nil
}

type L1BlobEndpointConfig struct {
// Address of L1 blob node endpoint to use, multiple alternative addresses separated by commas are supported, and will rotate when error
NodeAddrs string

// RateLimit specifies a self-imposed rate-limit on L1 requests. 0 is no rate-limit.
RateLimit float64

// BatchSize specifies the maximum batch-size, which also applies as L1 rate-limit burst amount (if set).
BatchSize int
}

var _ L1BlobEndpointSetup = (*L1BlobEndpointConfig)(nil)

func (cfg *L1BlobEndpointConfig) Check() error {
if cfg.NodeAddrs == "" {
return fmt.Errorf("empty L1 blob endpoint address")
}
if cfg.BatchSize < 1 || cfg.BatchSize > 500 {
return fmt.Errorf("batch size is invalid or unreasonable: %d", cfg.BatchSize)
}
if cfg.RateLimit < 0 {
return fmt.Errorf("rate limit cannot be negative")
}
return nil
}

func (cfg *L1BlobEndpointConfig) Setup(ctx context.Context, log log.Logger) ([]client.RPC, error) {
rpcClients := make([]client.RPC, 0)

opts := []client.RPCOption{
client.WithDialBackoff(10),
}
if cfg.RateLimit != 0 {
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}
isMultiUrl, urlList := client.MultiUrlParse(cfg.NodeAddrs)

if isMultiUrl {
for _, url := range urlList {
rpcClient, err := client.NewRPC(ctx, log, url, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", url, err)
}
rpcClients = append(rpcClients, rpcClient)
}
} else {
rpcClient, err := client.NewRPC(ctx, log, cfg.NodeAddrs, opts...)
if err != nil {
return nil, fmt.Errorf("setup blob client failed to dial L1 address (%s): %w", cfg.NodeAddrs, err)
}
rpcClients = append(rpcClients, rpcClient)
}

return rpcClients, nil
}
1 change: 1 addition & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
L2 L2EndpointSetup

Beacon L1BeaconEndpointSetup
L1Blob L1BlobEndpointSetup

Driver driver.Config

Expand Down
43 changes: 34 additions & 9 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ type OpNode struct {
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)

l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
runCfg *RuntimeConfig // runtime configurables
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
l1Blob *sources.BSCBlobClient // L1 Blob Client to fetch blobs
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
runCfg *RuntimeConfig // runtime configurables

safeDB closableSafeDB

Expand Down Expand Up @@ -123,6 +124,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err)
}
if err := n.initL1Blob(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1 blob: %w", err)
}
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return fmt.Errorf("failed to init L2: %w", err)
}
Expand Down Expand Up @@ -304,6 +308,27 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
return nil
}

func (n *OpNode) initL1Blob(ctx context.Context, cfg *Config) error {
// If Ecotone upgrade is not scheduled yet, then there is no need for a Blob API.
if cfg.Rollup.EcotoneTime == nil {
return nil
}
// Once the Ecotone upgrade is scheduled, we must have initialized the Blob API settings.
if cfg.L1Blob == nil {
return fmt.Errorf("missing L1 Blob Endpoint configuration: this API is mandatory for Ecotone upgrade at t=%d", *cfg.Rollup.EcotoneTime)
}
rpcClients, err := cfg.L1Blob.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L1 blob client: %w", err)
}
instrumentedClients := make([]client.RPC, 0)
for _, rpc := range rpcClients {
instrumentedClients = append(instrumentedClients, client.NewInstrumentedRPC(rpc, n.metrics))
}
n.l1Blob = sources.NewBSCBlobClient(instrumentedClients)
return nil
}

func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
Expand Down Expand Up @@ -342,7 +367,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.l1Blob, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Rollup: *rollupConfig,
Driver: *driverConfig,
Beacon: NewBeaconEndpointConfig(ctx),
L1Blob: NewL1BlobEndpointConfig(ctx),
RPC: node.RPCConfig{
ListenAddr: ctx.String(flags.RPCListenAddr.Name),
ListenPort: ctx.Int(flags.RPCListenPort.Name),
Expand Down Expand Up @@ -138,6 +139,14 @@ func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup {
}
}

func NewL1BlobEndpointConfig(ctx *cli.Context) node.L1BlobEndpointSetup {
return &node.L1BlobEndpointConfig{
NodeAddrs: ctx.String(flags.L1NodeAddr.Name) + "," + ctx.String(flags.L1ArchiveBlobRpcAddr.Name),
RateLimit: ctx.Float64(flags.L1BlobRpcRateLimit.Name),
BatchSize: ctx.Int(flags.L1BlobRpcMaxBatchSize.Name),
}
}

func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
return &node.L1EndpointConfig{
L1NodeAddr: ctx.String(flags.L1NodeAddr.Name),
Expand Down
3 changes: 2 additions & 1 deletion op-service/bsc/compat.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package bsc

import (
lru "github.com/hashicorp/golang-lru/v2"
"math/big"
"sort"

lru "github.com/hashicorp/golang-lru/v2"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down
2 changes: 1 addition & 1 deletion op-service/dial/static_l2_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientU
}
return &StaticL2EndpointProvider{
StaticL2RollupProvider: *rollupProvider,
ethClient: client.NewInstrumentedClient(ethClient, metrics),
ethClient: client.NewInstrumentedClient(ethClient, metrics),
}, nil
}

Expand Down
Loading

0 comments on commit 00d3473

Please sign in to comment.