Skip to content

Commit

Permalink
op-service, op-batcher, op-proposer: Active sequencer follow mode (#8585
Browse files Browse the repository at this point in the history
)

* op-service: Add ActiveL2EndpointProvider.

* Fix bug in initialization, and handle case where no ethUrls are provided.

* Split active L2 provider into active rollup and active L2 provider.

* Re-duplicate some code until tests are passing.

* op-proposer: Add ability to enable active provider.

* op-batcher: Add ability to enable active provider.

* Add an empty test skeleton.

* Add an empty test skeleton.

* op-service: add, but do not yet use, RollupClientInterface and EthClientInterface.

* op-service: update mocks and interfaces for endpoint provider testing.

* op-service - WIP on Active L2 Providers: unit tests pass, design and impl contains TODOs.

* op-service: restore design in Active Endpoint Providers that only keeps one client open at a time.

* op-service: when dialing a new sequencer, close() the old connection.

* op-service: obey coderabbit suggestion around safer handling of p.currentIndex in Active L2 Providers.

* op-service, op-batcher, op-proposer: address review comments in PR#8585.

* op-service: Active L2 Provider - add test case for a sequencer returning an error.

* op-service: Active L2/Rollup Providers: improve unit testing and logging.

* op-service, op-batcher: address review comments in 8585 regarding first-startup behavior and testing.

* op-service: address review comments through adding more tests, and moving "nil client" behavior from client getter to constructor.

* op-service: minor error message change in active endpoint providers.

* Update op-service/dial/active_l2_provider.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* op-service: obey linter in rabbit-provided error message change.

* Update op-service/dial/active_l2_provider.go

Co-authored-by: Sebastian Stammler <[email protected]>

* op-service active L2 provider tests: assertAllExpectations after most tests.

* op-service: more elegantly handle startup in active l2 providers, and improve testing.

* Change remaining longDurationTests to be able to use ept.assertAllExpectations.

* use new error errSeqUnset.

* Add test for scenario where many sequencers are inactive, and only the last is active.

* Readability change: move the on-creation initialization to its own function.

* Move extra one-time dial to constructor.

* Update op-service/dial/active_rollup_provider.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Add nil check to active l2 provider.

* Update op-service/dial/active_rollup_provider.go

Co-authored-by: Sebastian Stammler <[email protected]>

* Address review comment: change many-inactive tests to many-undialable tests.

* Add test that reproduces internal state corruption.

* op-service: Improve active seq provider

- Preserve the invariant that the index and current rollup/eth
  client match.
- Dial at the start of the loop instead of at the end.

* Fix some tests.

* Move usage of ExpectClose to MaybeClose, we don't want to enforce a particular close behavior in these tests.

* add a missing call to assertAllExpectations.

* Test even the case where the active providers are managing a list of 1 element.

* Revert experimental hunk in active_l2_provider.

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Sebastian Stammler <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2023
1 parent 078214c commit c04cefe
Show file tree
Hide file tree
Showing 15 changed files with 1,238 additions and 13 deletions.
8 changes: 6 additions & 2 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batcher
import (
"errors"
"fmt"
"strings"
"time"

"github.com/urfave/cli/v2"
Expand All @@ -20,10 +21,10 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string

// L2EthRpc is the HTTP provider URL for the L2 execution engine.
// L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
L2EthRpc string

// RollupRpc is the HTTP provider URL for the L2 rollup node.
// RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided.
RollupRpc string

// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a
Expand Down Expand Up @@ -74,6 +75,9 @@ func (c *CLIConfig) Check() error {
if c.RollupRpc == "" {
return errors.New("empty rollup RPC URL")
}
if strings.Count(c.RollupRpc, ",") != strings.Count(c.L2EthRpc, ",") {
return errors.New("number of rollup and eth URLs must match")
}
if c.PollInterval == 0 {
return errors.New("must set PollInterval")
}
Expand Down
12 changes: 10 additions & 2 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
_ "net/http/pprof"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -125,9 +126,16 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
}
bs.L1Client = l1Client

endpointProvider, err := dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
var endpointProvider dial.L2EndpointProvider
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to create L2 endpoint provider: %w", err)
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
bs.EndpointProvider = endpointProvider

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ var (
}
L2EthRpcFlag = &cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2 execution engine",
Usage: "HTTP provider URL for L2 execution engine. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.",
EnvVars: prefixEnvVars("L2_ETH_RPC"),
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for Rollup node",
Usage: "HTTP provider URL for Rollup node. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of l2-eth-rpcs provided.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
// Optional flags
Expand Down
2 changes: 1 addition & 1 deletion op-proposer/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for the rollup node",
Usage: "HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
L2OOAddressFlag = &cli.StringFlag{
Expand Down
2 changes: 1 addition & 1 deletion op-proposer/proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string

// RollupRpc is the HTTP provider URL for the rollup node.
// RollupRpc is the HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.
RollupRpc string

// L2OOAddress is the L2OutputOracle contract address.
Expand Down
9 changes: 8 additions & 1 deletion op-proposer/proposer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -121,7 +122,13 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e
}
ps.L1Client = l1Client

rollupProvider, err := dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
var rollupProvider dial.RollupProvider
if strings.Contains(cfg.RollupRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log)
} else {
rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
Expand Down
114 changes: 114 additions & 0 deletions op-service/dial/active_l2_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dial

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
)

const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout

type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error)

// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client
// It manages the lifecycle of the RollupClient and eth client for callers
// It does this by failing over down the list of rollupUrls if the current one is inactive or broken
type ActiveL2EndpointProvider struct {
ActiveL2RollupProvider
currentEthClient EthClientInterface
ethClientIndex int
ethDialer ethDialer
ethUrls []string
}

// NewActiveL2EndpointProvider creates a new ActiveL2EndpointProvider
// the checkDuration is the duration between checks to see if the current rollup client is active
// provide a checkDuration of 0 to check every time
func NewActiveL2EndpointProvider(ctx context.Context,
ethUrls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2EndpointProvider, error) {
ethDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (EthClientInterface, error) {
return DialEthClientWithTimeout(ctx, timeout, log, url)
}
rollupDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
}
return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
}

func newActiveL2EndpointProvider(
ctx context.Context,
ethUrls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
ethDialer ethDialer,
rollupDialer rollupDialer,
) (*ActiveL2EndpointProvider, error) {
if len(rollupUrls) == 0 {
return nil, errors.New("empty rollup urls list, expected at least one URL")
}
if len(ethUrls) != len(rollupUrls) {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(ethUrls), len(rollupUrls))
}

rollupProvider, err := newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
if err != nil {
return nil, err
}
p := &ActiveL2EndpointProvider{
ActiveL2RollupProvider: *rollupProvider,
ethDialer: ethDialer,
ethUrls: ethUrls,
}
cctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
if _, err = p.EthClient(cctx); err != nil {
return nil, fmt.Errorf("setting provider eth client: %w", err)
}
return p, nil
}

func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInterface, error) {
p.clientLock.Lock()
defer p.clientLock.Unlock()
err := p.ensureActiveEndpoint(ctx)
if err != nil {
return nil, err
}
if p.ethClientIndex != p.rollupIndex || p.currentEthClient == nil {
// we changed sequencers, dial a new EthClient
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
idx := p.rollupIndex
ep := p.ethUrls[idx]
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err)
}
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ethClientIndex = idx
p.currentEthClient = ethClient
}
return p.currentEthClient, nil
}

func (p *ActiveL2EndpointProvider) Close() {
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ActiveL2RollupProvider.Close()
}
Loading

0 comments on commit c04cefe

Please sign in to comment.