Skip to content

Commit

Permalink
close the codepaths. (untested)
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush committed Aug 1, 2023
1 parent 7e8905f commit e9a5a28
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 17 deletions.
5 changes: 5 additions & 0 deletions cmd/simulator/load/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type mkAgentBuilder func(
ctx context.Context, config config.Config, chainID *big.Int,
pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics,
) (AgentBuilder, error)

type AgentBuilder interface {
NewAgent(ctx context.Context, idx int, client ethclient.Client, sender common.Address) (txs.Agent, error)
}
Expand Down
60 changes: 47 additions & 13 deletions cmd/simulator/load/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (
)

// ExecuteLoader creates txSequences from [config] and has txAgents execute the specified simulation.
func ExecuteLoader(ctx context.Context, config config.Config) error {
if config.Timeout > 0 {
func ExecuteLoader(ctx context.Context, cfg config.Config) error {
if cfg.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
ctx, cancel = context.WithTimeout(ctx, cfg.Timeout)
defer cancel()
}

Expand All @@ -60,6 +60,49 @@ func ExecuteLoader(ctx context.Context, config config.Config) error {
cancel()
}()

// Create metrics
reg := prometheus.NewRegistry()
m := metrics.NewMetrics("", reg)
mB := metrics.NewMetrics("subnet_b_", reg)
mWarp := metrics.NewMetrics("warp_", reg)
timeTracker := newTimeTracker(mWarp.IssuanceToConfirmationTxTimes.Observe)
metricsPort := strconv.Itoa(int(cfg.MetricsPort))
go startMetricsServer(ctx, metricsPort, reg)

warpSenderAgentBuilder := func(
ctx context.Context, config config.Config, chainID *big.Int,
pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics,
) (AgentBuilder, error) {
return NewWarpSendTxAgentBuilder(ctx, config, chainID, pks, client, metrics, timeTracker)
}
warpReceiveTxAgentBuilder := func(
ctx context.Context, config config.Config, chainID *big.Int,
pks []*ecdsa.PrivateKey, client ethclient.Client, metrics *metrics.Metrics,
) (AgentBuilder, error) {
return NewWarpReceiveTxAgentBuilder(ctx, config, chainID, pks, client, metrics, timeTracker)
}

var eg errgroup.Group
eg.Go(func() error {
return executeLoaderImpl(ctx, cfg, warpSenderAgentBuilder, m)
})
eg.Go(func() error {
// TODO: should get these values properly
cfg := cfg
endpointsStr := os.Getenv("RPC_ENDPOINTS_SUBNET_B")
cfg.Endpoints = strings.Split(endpointsStr, ",")
return executeLoaderImpl(ctx, cfg, warpReceiveTxAgentBuilder, mB)
})
if err := eg.Wait(); err != nil {
return err
}
printOutputFromMetricsServer(metricsPort)
return nil
}

func executeLoaderImpl(
ctx context.Context, config config.Config, mkAgentBuilder mkAgentBuilder, m *metrics.Metrics,
) error {
// Construct the arguments for the load simulator
clients := make([]ethclient.Client, 0, len(config.Endpoints))
for i := 0; i < config.Workers; i++ {
Expand Down Expand Up @@ -94,11 +137,6 @@ func ExecuteLoader(ctx context.Context, config config.Config) error {
maxFeeCap := new(big.Int).Mul(big.NewInt(params.GWei), big.NewInt(config.MaxFeeCap))
minFundsPerAddr := new(big.Int).Mul(maxFeeCap, big.NewInt(int64(config.TxsPerWorker*params.TxGas)))

// Create metrics
reg := prometheus.NewRegistry()
m := metrics.NewMetrics(reg)
metricsPort := strconv.Itoa(int(config.MetricsPort))

log.Info("Distributing funds", "numTxsPerWorker", config.TxsPerWorker, "minFunds", minFundsPerAddr)
keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr, m)
if err != nil {
Expand All @@ -118,7 +156,7 @@ func ExecuteLoader(ctx context.Context, config config.Config) error {
return fmt.Errorf("failed to fetch chainID: %w", err)
}
log.Info("Constructing tx agents...", "numAgents", config.Workers)
agentBuilder, err := NewTransferTxAgentBuilder(ctx, config, chainID, pks, client, m)
agentBuilder, err := mkAgentBuilder(ctx, config, chainID, pks, client, m)
if err != nil {
return err
}
Expand All @@ -140,15 +178,11 @@ func ExecuteLoader(ctx context.Context, config config.Config) error {
})
}

go startMetricsServer(ctx, metricsPort, reg)

log.Info("Waiting for tx agents...")
if err := eg.Wait(); err != nil {
return err
}
log.Info("Tx agents completed successfully.")

printOutputFromMetricsServer(metricsPort)
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/simulator/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package metrics

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -17,20 +19,20 @@ type Metrics struct {
}

// NewMetrics creates and returns a Metrics and registers it with a Collector
func NewMetrics(reg prometheus.Registerer) *Metrics {
func NewMetrics(prefix string, reg prometheus.Registerer) *Metrics {
m := &Metrics{
IssuanceTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tx_issuance_time",
Name: fmt.Sprintf("%stx_issuance_time", prefix),
Help: "Individual Tx Issuance Times for a Load Test",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
ConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tx_confirmation_time",
Name: fmt.Sprintf("%stx_confirmation_time", prefix),
Help: "Individual Tx Confirmation Times for a Load Test",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
IssuanceToConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tx_issuance_to_confirmation_time",
Name: fmt.Sprintf("%stx_issuance_to_confirmation_time", prefix),
Help: "Individual Tx Issuance To Confirmation Times for a Load Test",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
Expand Down

0 comments on commit e9a5a28

Please sign in to comment.