Skip to content

Commit

Permalink
Merge pull request #7 from BrianBland/storage-stats
Browse files Browse the repository at this point in the history
Collect stats related to storage changes
  • Loading branch information
danyalprout authored Aug 12, 2024
2 parents 0a90f64 + 8e937cd commit 856b241
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 80 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/ethereum/go-ethereum v1.13.15
github.com/minio/minio-go/v7 v7.0.70
github.com/urfave/cli/v2 v2.27.1
golang.design/x/chann v0.1.2
)

require (
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,6 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
golang.design/x/chann v0.1.2 h1:eHF9wjuQnpp+j4ryWhyxC/pFuYzbvMAkudA/I5ALovY=
golang.design/x/chann v0.1.2/go.mod h1:Rh5KhCAp+0qu9+FfKPymHpu8onmjl89sFwMeiw3SK14=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
2 changes: 2 additions & 0 deletions packages/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ReplayorConfig struct {
GasLimit int
BenchmarkStartBlock uint64
BenchmarkOpcodes bool
ComputeStorageDiffs bool
TestName string
Bucket string
StorageType string
Expand Down Expand Up @@ -83,6 +84,7 @@ func LoadReplayorConfig(cliCtx *cli.Context, l log.Logger) (ReplayorConfig, erro
GasLimit: cliCtx.Int(GasLimit.Name),
BenchmarkStartBlock: cliCtx.Uint64(BenchmarkStartBlock.Name),
BenchmarkOpcodes: cliCtx.Bool(BenchmarkOpcodes.Name),
ComputeStorageDiffs: cliCtx.Bool(ComputeStorageDiffs.Name),
TestName: hostname,
Bucket: cliCtx.String(S3Bucket.Name),
StorageType: cliCtx.String(StorageType.Name),
Expand Down
7 changes: 6 additions & 1 deletion packages/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ var (
Usage: "whether to include opcode metrics in the benchmark results",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "BENCHMARK_OPCODES"),
}
ComputeStorageDiffs = &cli.BoolFlag{
Name: "compute-storage-diffs",
Usage: "whether to include storage diff metrics in the benchmark results",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "COMPUTE_STORAGE_DIFFS"),
}
S3Bucket = &cli.StringFlag{
Name: "s3-bucket",
Usage: "The S3 bucket to store results in",
Expand All @@ -99,7 +104,7 @@ var (

func init() {
Flags = append(Flags, oplog.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, EngineApiSecret, SourceNodeUrl, ChainId, EngineApiUrl, ExecutionUrl, Strategy, BlockCount, GasTarget, GasLimit, S3Bucket, StorageType, DiskPath, BenchmarkStartBlock, BenchmarkOpcodes, RollupConfigPath)
Flags = append(Flags, EngineApiSecret, SourceNodeUrl, ChainId, EngineApiUrl, ExecutionUrl, Strategy, BlockCount, GasTarget, GasLimit, S3Bucket, StorageType, DiskPath, BenchmarkStartBlock, BenchmarkOpcodes, ComputeStorageDiffs, RollupConfigPath)
}

// Flags contains the list of configuration options available to the binary.
Expand Down
97 changes: 29 additions & 68 deletions packages/replayor/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package replayor

import (
"context"
"fmt"
"math/big"
"sync"
"time"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"golang.design/x/chann"
)

const (
Expand All @@ -34,7 +34,6 @@ type Benchmark struct {

incomingBlocks chan *types.Block
processBlocks chan strategies.BlockCreationParams
recordStats *chann.Chann[stats.BlockCreationStats]

previousReplayedBlockHash common.Hash
strategy strategies.Strategy
Expand All @@ -46,6 +45,7 @@ type Benchmark struct {
endBlockNum uint64

benchmarkOpcodes bool
diffStorage bool
}

func (r *Benchmark) getBlockFromSourceNode(ctx context.Context, blockNum uint64) (*types.Block, error) {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) {
}

if block == nil {
panic(err)
panic(fmt.Errorf("unexpected nil block: %d", blockNum))
}

m.Lock()
Expand Down Expand Up @@ -214,22 +214,12 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC

r.previousReplayedBlockHash = envelope.ExecutionPayload.BlockHash

r.recordStats.In() <- stats

r.sm.Lock()
defer r.sm.Unlock()

if r.remainingBlockCount == 0 {
r.log.Info("finished processing blocks")
r.recordStats.Close()
return
}

r.remainingBlockCount -= 1
r.enrich(ctx, &stats)
r.s.RecordBlockStats(stats)
}

func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
receipts, err := retry.Do(ctx, 10, retry.Exponential(), func() ([]*types.Receipt, error) {
receipts, err := retry.Do(ctx, 3, retry.Exponential(), func() ([]*types.Receipt, error) {
return r.clients.DestNode.BlockReceipts(ctx, rpc.BlockNumberOrHash{BlockHash: &s.BlockHash})
})

Expand All @@ -246,57 +236,20 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
}
s.Success = float64(success) / float64(len(receipts))

if r.benchmarkOpcodes {
r.computeTraceStats(ctx, s, receipts)
}
}

func (r *Benchmark) enrichAndRecordStats(ctx context.Context) chan any {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for {
select {
case stats, ok := <-r.recordStats.Out():
if !ok {
return
}
r.enrich(ctx, &stats)

r.sm.Lock()
r.s.RecordBlockStats(stats)
r.sm.Unlock()

r.log.Debug("block stats", "BlockNumber", stats.BlockNumber, "BlockHash", stats.BlockHash, "TxnCount", stats.TxnCount, "TotalTime", stats.TotalTime, "FCUTime", stats.FCUTime, "GetTime", stats.GetTime, "NewTime", stats.NewTime, "FCUNoAttrsTime", stats.FCUNoAttrsTime, "Success", stats.Success, "GasUsed", stats.GasUsed, "GasLimit", stats.GasLimit)
case <-ctx.Done():
return
}
}
}()
}

doneChan := make(chan any)

go func() {
wg.Wait()
close(doneChan)
}()

return doneChan
r.computeTraceStats(ctx, s, receipts)
}

func (r *Benchmark) submitBlocks(ctx context.Context) {
for {
select {
case block := <-r.processBlocks:
if block.Number > r.endBlockNum {
r.log.Debug("stopping block processing")
continue
case block, ok := <-r.processBlocks:
if block.Number > r.endBlockNum || !ok {
r.log.Info("stopping block processing")
return
}

r.addBlock(ctx, block)
r.remainingBlockCount -= 1
case <-ctx.Done():
return
}
Expand All @@ -306,9 +259,13 @@ func (r *Benchmark) submitBlocks(ctx context.Context) {
func (r *Benchmark) mapBlocks(ctx context.Context) {
for {
select {
case b := <-r.incomingBlocks:
if b == nil {
r.log.Debug("stopping block processing")
case b, ok := <-r.incomingBlocks:
if !ok {
r.log.Info("stopping block mapping")
close(r.processBlocks)
return
} else if b == nil {
r.log.Warn("nil block received")
continue
}

Expand All @@ -326,10 +283,13 @@ func (r *Benchmark) mapBlocks(ctx context.Context) {
}

func (r *Benchmark) Run(ctx context.Context) {
doneChan := make(chan any)
go r.loadBlocks(ctx)
go r.mapBlocks(ctx)
go r.submitBlocks(ctx)
doneChan := r.enrichAndRecordStats(ctx)
go func() {
r.submitBlocks(ctx)
close(doneChan)
}()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand All @@ -350,7 +310,7 @@ func (r *Benchmark) Run(ctx context.Context) {
l.Error("unable to load current block", "err", err)
}

l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "statProgress", r.recordStats.Len(), "remaining", r.remainingBlockCount)
l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "remaining", r.remainingBlockCount)

lastBlockNum = currentBlock.NumberU64()
case <-ctx.Done():
Expand All @@ -369,22 +329,23 @@ func NewBenchmark(
s stats.Stats,
currentBlock *types.Block,
benchmarkBlockCount uint64,
benchmarkOpcodes bool) *Benchmark {
benchmarkOpcodes bool,
diffStorage bool) *Benchmark {
r := &Benchmark{
clients: c,
rollupCfg: rollupCfg,
log: logger,
incomingBlocks: make(chan *types.Block, 25),
processBlocks: make(chan strategies.BlockCreationParams, 25),
recordStats: chann.New[stats.BlockCreationStats](),
strategy: strategy,
s: s,
currentBlock: currentBlock,
startBlockNum: currentBlock.NumberU64() + 1,
endBlockNum: currentBlock.NumberU64() + 1 + benchmarkBlockCount,
endBlockNum: currentBlock.NumberU64() + benchmarkBlockCount,
remainingBlockCount: benchmarkBlockCount,
previousReplayedBlockHash: currentBlock.Hash(),
benchmarkOpcodes: benchmarkOpcodes,
diffStorage: diffStorage,
}

return r
Expand Down
5 changes: 3 additions & 2 deletions packages/replayor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (r *Service) Start(ctx context.Context) error {
&strategies.OneForOne{},
&stats.NoOpStats{},
currentBlock,
r.cfg.BenchmarkStartBlock-currentBlock.NumberU64()-1,
r.cfg.BenchmarkStartBlock-currentBlock.NumberU64(),
false,
false)

walkUpToBlock.Run(cCtx)
Expand All @@ -94,7 +95,7 @@ func (r *Service) Start(ctx context.Context) error {
panic(err)
}

benchmark := NewBenchmark(r.clients, r.cfg.RollupConfig, r.log, strategy, r.stats, currentBlock, uint64(r.cfg.BlockCount), r.cfg.BenchmarkOpcodes)
benchmark := NewBenchmark(r.clients, r.cfg.RollupConfig, r.log, strategy, r.stats, currentBlock, uint64(r.cfg.BlockCount), r.cfg.BenchmarkOpcodes, r.cfg.ComputeStorageDiffs)
benchmark.Run(cCtx)

return nil
Expand Down
Loading

0 comments on commit 856b241

Please sign in to comment.