Skip to content

Commit

Permalink
replay: add option to apply subset of a workload
Browse files Browse the repository at this point in the history
Add a `--max-writes` flag to the cmd/pebble replay command that limits the
amount of a workload to replay by the configured MB. This allows us to collect
large workloads and apply a smaller subset at replay time if the full workload
is unnecessary.
  • Loading branch information
jbowens committed Feb 6, 2023
1 parent f2e58dc commit e935ca0
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
6 changes: 6 additions & 0 deletions cmd/pebble/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func initReplayCmd() *cobra.Command {
&c.name, "name", "", "the name of the workload being replayed")
cmd.Flags().VarPF(
&c.pacer, "pacer", "p", "the pacer to use: unpaced, reference-ramp, or fixed-ramp=N")
cmd.Flags().Uint64Var(
&c.maxWritesMB, "max-writes", 0, "the maximum volume of writes (MB) to apply, with 0 denoting unlimited")
cmd.Flags().StringVar(
&c.optionsString, "options", "", "Pebble options to override, in the OPTIONS ini format but with any whitespace as field delimiters instead of newlines")
cmd.Flags().StringVar(
Expand All @@ -61,6 +63,7 @@ type replayConfig struct {
pacer pacerFlag
runDir string
count int
maxWritesMB uint64
streamLogs bool
ignoreCheckpoint bool
optionsString string
Expand Down Expand Up @@ -93,6 +96,9 @@ func (c *replayConfig) runOnce(stdout io.Writer, workloadPath string) error {
Pacer: c.pacer,
Opts: &pebble.Options{},
}
if c.maxWritesMB > 0 {
r.MaxWriteBytes = c.maxWritesMB * (1 << 20)
}
if err := c.initRunDir(r); err != nil {
return err
}
Expand Down
30 changes: 25 additions & 5 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Metrics struct {
BytesWeightedByLevel uint64
}
TotalWriteAmp float64
WriteBytes uint64
WriteStalls uint64
WriteStallsDuration time.Duration
}
Expand Down Expand Up @@ -180,11 +181,12 @@ func (m *Metrics) BenchmarkString(name string) string {
// Runner runs a captured workload against a test database, collecting
// metrics on performance.
type Runner struct {
RunDir string
WorkloadFS vfs.FS
WorkloadPath string
Pacer Pacer
Opts *pebble.Options
RunDir string
WorkloadFS vfs.FS
WorkloadPath string
Pacer Pacer
Opts *pebble.Options
MaxWriteBytes uint64

// Internal state.

Expand All @@ -202,6 +204,7 @@ type Runner struct {
stepsApplied chan workloadStep

metrics struct {
writeBytes uint64
writeStalls uint64
writeStallsDurationNano uint64
}
Expand Down Expand Up @@ -420,6 +423,7 @@ func (r *Runner) Wait() (Metrics, error) {
m := Metrics{
Final: pm,
TotalWriteAmp: total.WriteAmp(),
WriteBytes: r.metrics.writeBytes,
WriteStalls: r.metrics.writeStalls,
WriteStallsDuration: time.Duration(r.metrics.writeStallsDurationNano),
}
Expand Down Expand Up @@ -556,6 +560,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {

idx := r.workload.manifestIdx

var cumulativeWriteBytes uint64
var flushBufs flushBuffers
var v *manifest.Version
var previousVersion *manifest.Version
Expand All @@ -576,6 +581,10 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
}

for ; idx < len(r.workload.manifests); idx++ {
if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
break
}

err := func() error {
manifestName := r.workload.manifests[idx]
f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
Expand Down Expand Up @@ -688,6 +697,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
}
cumulativeWriteBytes += uint64(s.flushBatch.Len())
case ingestStepKind:
// Copy the ingested sstables into a staging area within the
// run dir. This is necessary for two reasons:
Expand All @@ -705,6 +715,11 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
}
finfo, err := r.Opts.FS.Stat(dst)
if err != nil {
return errors.Wrapf(err, "stating %q", dst)
}
cumulativeWriteBytes += uint64(finfo.Size())
s.tablesToIngest = append(s.tablesToIngest, dst)
}
case compactionStepKind:
Expand All @@ -716,13 +731,18 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
return ctx.Err()
case r.steps <- s:
}

if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
break
}
}
return nil
}()
if err != nil {
return err
}
}
atomic.StoreUint64(&r.metrics.writeBytes, cumulativeWriteBytes)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions replay/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datatest"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -131,10 +132,11 @@ func runReplayTest(t *testing.T, path string) {
ct.WaitForInflightCompactionsToEqual(target)
return ""
case "wait":
if _, err := r.Wait(); err != nil {
m, err := r.Wait()
if err != nil {
return err.Error()
}
return ""
return fmt.Sprintf("replayed %s in writes", humanize.Uint64(m.WriteBytes))
case "close":
if err := r.Close(); err != nil {
return err.Error()
Expand Down
1 change: 1 addition & 0 deletions replay/testdata/replay
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ replay simple unpaced

wait
----
replayed 42 B in writes

# NB: The file sizes are non-deterministic after replay (because compactions are
# nondeterministic). We don't `tree` here as a result.
Expand Down
1 change: 1 addition & 0 deletions replay/testdata/replay_paced
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ wait-for-compactions

wait
----
replayed 42 B in writes

scan-keys
----
Expand Down

0 comments on commit e935ca0

Please sign in to comment.