From 3aafe9e84891c6ffabe9df3c27184a32b8956260 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 5 Jun 2019 10:49:21 +0200 Subject: [PATCH] gemini: warmup time configurable using cli argument Gemini now has a cli argument 'warmup' which is a duration given in a format such as '30s', '15m' or '10h'. This time is added to the duration time so as to make total run time easy to calculate. For example: Given a duration of 10h and a warmup of 2h the total runtime will be an 12 hours and validations will start to happen after a 2 hours warmup run with only inserts. --- CHANGELOG.md | 1 + cmd/gemini/root.go | 37 ++++++++++++++++++++++++++++--------- schema.go | 5 ++++- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 627ed2fc..8b48a76f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Prometheus metrics added that exposes internal runtime properties as well as counts fo CQL operations 'batch', 'delete', 'insert', 'select' and 'update'. +- Warmup duration added during which only inserts are performed. - Generating valid single index queries when a complex primary key is used. - Gracefully stopping on sigint and sigterm. - JSON marshalling of Schema fixed. The schema input file has changed to diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index c49add7f..5325f6e2 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -41,6 +41,7 @@ var ( nonInteractive bool duration time.Duration bind string + warmup time.Duration ) const ( @@ -72,7 +73,7 @@ func interactive() bool { return !nonInteractive } -type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File) +type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration) func (r *Status) Merge(sum *Status) Status { sum.WriteOps += r.WriteOps @@ -232,7 +233,7 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o Max: maxRange + i*maxRange, Rand: rand.New(rand.NewSource(int64(seed))), } - go f(workerCtx, &workers, schema, table, s, p, c, mode, out) + go f(workerCtx, &workers, schema, table, s, p, c, mode, out, warmup) } } @@ -282,7 +283,7 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o } } } - }(duration) + }(duration + warmup) workers.Wait() close(c) @@ -296,8 +297,8 @@ func stop(cancel context.CancelFunc, c chan Status, out io.Writer, res Status) { res.PrintResult(out) } -func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { - mutateStmt, err := schema.GenMutateStmt(table, &p) +func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) { + mutateStmt, err := schema.GenMutateStmt(table, &p, deletes) if err != nil { fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err) testStatus.WriteErrors++ @@ -342,11 +343,27 @@ func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Tab } } -func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File) { +func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) { defer wg.Done() testStatus := Status{} - var i int + warmupTimer := time.NewTimer(warmup) +warmup: + for { + select { + case <-ctx.Done(): + return + case <-warmupTimer.C: + break warmup + default: + mutationJob(ctx, schema, table, s, p, &testStatus, out, false) + if i%1000 == 0 { + c <- testStatus + testStatus = Status{} + } + } + } + for { select { case <-ctx.Done(): @@ -355,13 +372,13 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table * } switch mode { case writeMode: - mutationJob(ctx, schema, table, s, p, &testStatus, out) + mutationJob(ctx, schema, table, s, p, &testStatus, out, true) case readMode: validationJob(ctx, schema, table, s, p, &testStatus, out) default: ind := p.Rand.Intn(100000) % 2 if ind == 0 { - mutationJob(ctx, schema, table, s, p, &testStatus, out) + mutationJob(ctx, schema, table, s, p, &testStatus, out, true) } else { validationJob(ctx, schema, table, s, p, &testStatus, out) } @@ -408,6 +425,7 @@ func init() { rootCmd.Flags().DurationVarP(&duration, "duration", "", 30*time.Second, "") rootCmd.Flags().StringVarP(&outFileArg, "outfile", "", "", "Specify the name of the file where the results should go") rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'") + rootCmd.Flags().DurationVarP(&warmup, "warmup", "", 30*time.Second, "Specify the warmup perid as a duration for example 30s or 10h") } func printSetup() error { @@ -416,6 +434,7 @@ func printSetup() error { rand.Seed(int64(seed)) fmt.Fprintf(tw, "Seed:\t%d\n", seed) fmt.Fprintf(tw, "Maximum duration:\t%s\n", duration) + fmt.Fprintf(tw, "Warmup duration:\t%s\n", warmup) fmt.Fprintf(tw, "Concurrency:\t%d\n", concurrency) fmt.Fprintf(tw, "Number of partitions per thread:\t%d\n", pkNumberPerThread) fmt.Fprintf(tw, "Test cluster:\t%s\n", testClusterHost) diff --git a/schema.go b/schema.go index dc4351a9..28c9ccc5 100644 --- a/schema.go +++ b/schema.go @@ -370,7 +370,10 @@ func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) { }, nil } -func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange) (*Stmt, error) { +func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange, deletes bool) (*Stmt, error) { + if !deletes { + return s.GenInsertStmt(t, p) + } switch n := p.Rand.Intn(1000); n { case 10, 100: return s.GenDeleteRows(t, p)