Skip to content

Commit

Permalink
op-batcher: service lifecycle cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Oct 14, 2023
1 parent 848ae87 commit 12f7465
Show file tree
Hide file tree
Showing 24 changed files with 657 additions and 360 deletions.
108 changes: 15 additions & 93 deletions op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,110 +3,32 @@ package batcher
import (
"context"
"fmt"
_ "net/http/pprof"

"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)

// Main is the entrypoint into the Batch Submitter. This method returns a
// closure that executes the service and blocks until the service exits. The use
// of a closure allows the parameters bound to the top-level main package, e.g.
// GitVersion, to be captured and used once the function is executed.
func Main(version string, cliCtx *cli.Context) error {
if err := flags.CheckRequired(cliCtx); err != nil {
return err
}
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}

l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
procName := "default"
m := metrics.NewMetrics(procName)
l.Info("Initializing Batch Submitter")

batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
}

if !cfg.Stopped {
if err := batchSubmitter.Start(); err != nil {
l.Error("Unable to start Batch Submitter", "error", err)
return err
}
}

defer batchSubmitter.StopIfRunning(context.Background())

pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
// Main is the entrypoint into the Batch Submitter.
// This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed batch-submitter with.
func Main(version string) cliapp.LifecycleAction {
return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
if err := flags.CheckRequired(cliCtx); err != nil {
return nil, err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer func() {
if err := pprofSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
}

metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return nil, fmt.Errorf("invalid CLI flags: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
}

server := oprpc.NewServer(
cfg.RPCFlag.ListenAddr,
cfg.RPCFlag.ListenPort,
version,
oprpc.WithLogger(l),
)
if cfg.RPCFlag.EnableAdmin {
adminAPI := rpc.NewAdminAPI(batchSubmitter, &m.RPCMetrics, l)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
l.Info("Admin RPC enabled")
}
if err := server.Start(); err != nil {
return fmt.Errorf("error starting RPC server: %w", err)
}

m.RecordInfo(version)
m.RecordUp()
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)

opio.BlockOnInterrupts()
if err := server.Stop(); err != nil {
l.Error("Error shutting down http server: %w", err)
l.Info("Initializing Batch Submitter")
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
}
return nil
}
45 changes: 5 additions & 40 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,17 @@ package batcher
import (
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)

type Config struct {
log log.Logger
metr metrics.Metricer
L1Client *ethclient.Client
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager

NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64

// RollupConfig is queried at startup
Rollup *rollup.Config

// Channel builder parameters
Channel ChannelConfig
}

// Check ensures that the [Config] is valid.
func (c *Config) Check() error {
if err := c.Rollup.Check(); err != nil {
return err
}
if err := c.Channel.Check(); err != nil {
return err
}
return nil
}

type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
Expand Down Expand Up @@ -92,7 +57,7 @@ type CLIConfig struct {
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
RPCFlag oprpc.CLIConfig
RPC oprpc.CLIConfig
}

func (c CLIConfig) Check() error {
Expand All @@ -107,15 +72,15 @@ func (c CLIConfig) Check() error {
if err := c.TxMgrConfig.Check(); err != nil {
return err
}
if err := c.RPCFlag.Check(); err != nil {
if err := c.RPC.Check(); err != nil {
return err
}
return nil
}

// NewConfig parses the Config from the provided flags or environment variables.
func NewConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
func NewConfig(ctx *cli.Context) *CLIConfig {
return &CLIConfig{
/* Required Flags */
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
Expand All @@ -133,6 +98,6 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPCFlag: oprpc.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}
}
Loading

0 comments on commit 12f7465

Please sign in to comment.