Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: service lifecycle cleanup #7682

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 15 additions & 93 deletions op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,110 +3,32 @@ package batcher
import (
"context"
"fmt"
_ "net/http/pprof"

"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)

// Main is the entrypoint into the Batch Submitter. This method returns a
// closure that executes the service and blocks until the service exits. The use
// of a closure allows the parameters bound to the top-level main package, e.g.
// GitVersion, to be captured and used once the function is executed.
func Main(version string, cliCtx *cli.Context) error {
if err := flags.CheckRequired(cliCtx); err != nil {
return err
}
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}

l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
procName := "default"
m := metrics.NewMetrics(procName)
l.Info("Initializing Batch Submitter")

batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
}

if !cfg.Stopped {
if err := batchSubmitter.Start(); err != nil {
l.Error("Unable to start Batch Submitter", "error", err)
return err
}
}

defer batchSubmitter.StopIfRunning(context.Background())

pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
// Main is the entrypoint into the Batch Submitter.
// This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed batch-submitter with.
func Main(version string) cliapp.LifecycleAction {
return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
if err := flags.CheckRequired(cliCtx); err != nil {
return nil, err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer func() {
if err := pprofSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
}

metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return nil, fmt.Errorf("invalid CLI flags: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
}

server := oprpc.NewServer(
cfg.RPCFlag.ListenAddr,
cfg.RPCFlag.ListenPort,
version,
oprpc.WithLogger(l),
)
if cfg.RPCFlag.EnableAdmin {
adminAPI := rpc.NewAdminAPI(batchSubmitter, &m.RPCMetrics, l)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
l.Info("Admin RPC enabled")
}
if err := server.Start(); err != nil {
return fmt.Errorf("error starting RPC server: %w", err)
}

m.RecordInfo(version)
m.RecordUp()
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)

opio.BlockOnInterrupts()
if err := server.Stop(); err != nil {
l.Error("Error shutting down http server: %w", err)
l.Info("Initializing Batch Submitter")
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
}
return nil
}
49 changes: 7 additions & 42 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,17 @@ package batcher
import (
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)

type Config struct {
log log.Logger
metr metrics.Metricer
L1Client *ethclient.Client
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager

NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64

// RollupConfig is queried at startup
Rollup *rollup.Config

// Channel builder parameters
Channel ChannelConfig
}

// Check ensures that the [Config] is valid.
func (c *Config) Check() error {
if err := c.Rollup.Check(); err != nil {
return err
}
if err := c.Channel.Check(); err != nil {
return err
}
return nil
}

type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
Expand Down Expand Up @@ -92,11 +57,11 @@ type CLIConfig struct {
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
RPCFlag oprpc.CLIConfig
RPC oprpc.CLIConfig
}

func (c CLIConfig) Check() error {
// TODO: check the sanity of flags loaded directly https://github.com/ethereum-optimism/optimism/issues/7512
func (c *CLIConfig) Check() error {
// TODO(7512): check the sanity of flags loaded directly https://github.com/ethereum-optimism/optimism/issues/7512

if err := c.MetricsConfig.Check(); err != nil {
return err
Expand All @@ -107,15 +72,15 @@ func (c CLIConfig) Check() error {
if err := c.TxMgrConfig.Check(); err != nil {
return err
}
if err := c.RPCFlag.Check(); err != nil {
if err := c.RPC.Check(); err != nil {
return err
}
return nil
}

// NewConfig parses the Config from the provided flags or environment variables.
func NewConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
func NewConfig(ctx *cli.Context) *CLIConfig {
protolambda marked this conversation as resolved.
Show resolved Hide resolved
return &CLIConfig{
/* Required Flags */
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
Expand All @@ -133,6 +98,6 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPCFlag: oprpc.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}
}
Loading