From e9a5a28fceb86d5d0d72e37064be3fc607d01c9f Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 31 Jul 2023 17:43:34 -0700 Subject: [PATCH] close the codepaths. (untested) --- cmd/simulator/load/agents.go | 5 +++ cmd/simulator/load/loader.go | 60 +++++++++++++++++++++++++------- cmd/simulator/metrics/metrics.go | 10 +++--- 3 files changed, 58 insertions(+), 17 deletions(-) diff --git a/cmd/simulator/load/agents.go b/cmd/simulator/load/agents.go index 45a5417feb..608ba73054 100644 --- a/cmd/simulator/load/agents.go +++ b/cmd/simulator/load/agents.go @@ -19,6 +19,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type mkAgentBuilder func( + ctx context.Context, config config.Config, chainID *big.Int, + pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics, +) (AgentBuilder, error) + type AgentBuilder interface { NewAgent(ctx context.Context, idx int, client ethclient.Client, sender common.Address) (txs.Agent, error) } diff --git a/cmd/simulator/load/loader.go b/cmd/simulator/load/loader.go index d5a2d7f795..690d2fca7e 100644 --- a/cmd/simulator/load/loader.go +++ b/cmd/simulator/load/loader.go @@ -35,10 +35,10 @@ const ( ) // ExecuteLoader creates txSequences from [config] and has txAgents execute the specified simulation. -func ExecuteLoader(ctx context.Context, config config.Config) error { - if config.Timeout > 0 { +func ExecuteLoader(ctx context.Context, cfg config.Config) error { + if cfg.Timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, config.Timeout) + ctx, cancel = context.WithTimeout(ctx, cfg.Timeout) defer cancel() } @@ -60,6 +60,49 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { cancel() }() + // Create metrics + reg := prometheus.NewRegistry() + m := metrics.NewMetrics("", reg) + mB := metrics.NewMetrics("subnet_b_", reg) + mWarp := metrics.NewMetrics("warp_", reg) + timeTracker := newTimeTracker(mWarp.IssuanceToConfirmationTxTimes.Observe) + metricsPort := strconv.Itoa(int(cfg.MetricsPort)) + go startMetricsServer(ctx, metricsPort, reg) + + warpSenderAgentBuilder := func( + ctx context.Context, config config.Config, chainID *big.Int, + pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics, + ) (AgentBuilder, error) { + return NewWarpSendTxAgentBuilder(ctx, config, chainID, pks, client, metrics, timeTracker) + } + warpReceiveTxAgentBuilder := func( + ctx context.Context, config config.Config, chainID *big.Int, + pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics, + ) (AgentBuilder, error) { + return NewWarpReceiveTxAgentBuilder(ctx, config, chainID, pks, client, metrics, timeTracker) + } + + var eg errgroup.Group + eg.Go(func() error { + return executeLoaderImpl(ctx, cfg, warpSenderAgentBuilder, m) + }) + eg.Go(func() error { + // TODO: should get these values properly + cfg := cfg + endpointsStr := os.Getenv("RPC_ENDPOINTS_SUBNET_B") + cfg.Endpoints = strings.Split(endpointsStr, ",") + return executeLoaderImpl(ctx, cfg, warpReceiveTxAgentBuilder, mB) + }) + if err := eg.Wait(); err != nil { + return err + } + printOutputFromMetricsServer(metricsPort) + return nil +} + +func executeLoaderImpl( + ctx context.Context, config config.Config, mkAgentBuilder mkAgentBuilder, m *metrics.Metrics, +) error { // Construct the arguments for the load simulator clients := make([]ethclient.Client, 0, len(config.Endpoints)) for i := 0; i < config.Workers; i++ { @@ -94,11 +137,6 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { maxFeeCap := new(big.Int).Mul(big.NewInt(params.GWei), big.NewInt(config.MaxFeeCap)) minFundsPerAddr := new(big.Int).Mul(maxFeeCap, big.NewInt(int64(config.TxsPerWorker*params.TxGas))) - // Create metrics - reg := prometheus.NewRegistry() - m := metrics.NewMetrics(reg) - metricsPort := strconv.Itoa(int(config.MetricsPort)) - log.Info("Distributing funds", "numTxsPerWorker", config.TxsPerWorker, "minFunds", minFundsPerAddr) keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr, m) if err != nil { @@ -118,7 +156,7 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { return fmt.Errorf("failed to fetch chainID: %w", err) } log.Info("Constructing tx agents...", "numAgents", config.Workers) - agentBuilder, err := NewTransferTxAgentBuilder(ctx, config, chainID, pks, client, m) + agentBuilder, err := mkAgentBuilder(ctx, config, chainID, pks, client, m) if err != nil { return err } @@ -140,15 +178,11 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { }) } - go startMetricsServer(ctx, metricsPort, reg) - log.Info("Waiting for tx agents...") if err := eg.Wait(); err != nil { return err } log.Info("Tx agents completed successfully.") - - printOutputFromMetricsServer(metricsPort) return nil } diff --git a/cmd/simulator/metrics/metrics.go b/cmd/simulator/metrics/metrics.go index 8462e2f2e3..5d396a4dc8 100644 --- a/cmd/simulator/metrics/metrics.go +++ b/cmd/simulator/metrics/metrics.go @@ -4,6 +4,8 @@ package metrics import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" ) @@ -17,20 +19,20 @@ type Metrics struct { } // NewMetrics creates and returns a Metrics and registers it with a Collector -func NewMetrics(reg prometheus.Registerer) *Metrics { +func NewMetrics(prefix string, reg prometheus.Registerer) *Metrics { m := &Metrics{ IssuanceTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tx_issuance_time", + Name: fmt.Sprintf("%stx_issuance_time", prefix), Help: "Individual Tx Issuance Times for a Load Test", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), ConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tx_confirmation_time", + Name: fmt.Sprintf("%stx_confirmation_time", prefix), Help: "Individual Tx Confirmation Times for a Load Test", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), IssuanceToConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tx_issuance_to_confirmation_time", + Name: fmt.Sprintf("%stx_issuance_to_confirmation_time", prefix), Help: "Individual Tx Issuance To Confirmation Times for a Load Test", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }),