Skip to content

Commit

Permalink
gemini: warmup time configurable using cli argument
Browse files Browse the repository at this point in the history
Gemini now has a cli argument 'warmup' which is a duration given
in a format such as '30s', '15m' or '10h'.
This time is added to the duration time so as to make total run
time easy to calculate.

For example: Given a duration of 10h and a warmup of 2h the total
runtime will be an 12 hours and validations will start to happen
after a 2 hours warmup run with only inserts.
  • Loading branch information
Henrik Johansson committed Jun 7, 2019
1 parent ec910e8 commit 3aafe9e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Prometheus metrics added that exposes internal runtime properties
as well as counts fo CQL operations 'batch', 'delete', 'insert', 'select'
and 'update'.
- Warmup duration added during which only inserts are performed.
- Generating valid single index queries when a complex primary key is used.
- Gracefully stopping on sigint and sigterm.
- JSON marshalling of Schema fixed. The schema input file has changed to
Expand Down
37 changes: 28 additions & 9 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
nonInteractive bool
duration time.Duration
bind string
warmup time.Duration
)

const (
Expand Down Expand Up @@ -72,7 +73,7 @@ func interactive() bool {
return !nonInteractive
}

type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File)
type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration)

func (r *Status) Merge(sum *Status) Status {
sum.WriteOps += r.WriteOps
Expand Down Expand Up @@ -232,7 +233,7 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o
Max: maxRange + i*maxRange,
Rand: rand.New(rand.NewSource(int64(seed))),
}
go f(workerCtx, &workers, schema, table, s, p, c, mode, out)
go f(workerCtx, &workers, schema, table, s, p, c, mode, out, warmup)
}
}

Expand Down Expand Up @@ -282,7 +283,7 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o
}
}
}
}(duration)
}(duration + warmup)

workers.Wait()
close(c)
Expand All @@ -296,8 +297,8 @@ func stop(cancel context.CancelFunc, c chan Status, out io.Writer, res Status) {
res.PrintResult(out)
}

func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
mutateStmt, err := schema.GenMutateStmt(table, &p)
func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) {
mutateStmt, err := schema.GenMutateStmt(table, &p, deletes)
if err != nil {
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
testStatus.WriteErrors++
Expand Down Expand Up @@ -342,11 +343,27 @@ func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Tab
}
}

func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File) {
func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) {
defer wg.Done()
testStatus := Status{}

var i int
warmupTimer := time.NewTimer(warmup)
warmup:
for {
select {
case <-ctx.Done():
return
case <-warmupTimer.C:
break warmup
default:
mutationJob(ctx, schema, table, s, p, &testStatus, out, false)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
}
}

for {
select {
case <-ctx.Done():
Expand All @@ -355,13 +372,13 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *
}
switch mode {
case writeMode:
mutationJob(ctx, schema, table, s, p, &testStatus, out)
mutationJob(ctx, schema, table, s, p, &testStatus, out, true)
case readMode:
validationJob(ctx, schema, table, s, p, &testStatus, out)
default:
ind := p.Rand.Intn(100000) % 2
if ind == 0 {
mutationJob(ctx, schema, table, s, p, &testStatus, out)
mutationJob(ctx, schema, table, s, p, &testStatus, out, true)
} else {
validationJob(ctx, schema, table, s, p, &testStatus, out)
}
Expand Down Expand Up @@ -408,6 +425,7 @@ func init() {
rootCmd.Flags().DurationVarP(&duration, "duration", "", 30*time.Second, "")
rootCmd.Flags().StringVarP(&outFileArg, "outfile", "", "", "Specify the name of the file where the results should go")
rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'")
rootCmd.Flags().DurationVarP(&warmup, "warmup", "", 30*time.Second, "Specify the warmup perid as a duration for example 30s or 10h")
}

func printSetup() error {
Expand All @@ -416,6 +434,7 @@ func printSetup() error {
rand.Seed(int64(seed))
fmt.Fprintf(tw, "Seed:\t%d\n", seed)
fmt.Fprintf(tw, "Maximum duration:\t%s\n", duration)
fmt.Fprintf(tw, "Warmup duration:\t%s\n", warmup)
fmt.Fprintf(tw, "Concurrency:\t%d\n", concurrency)
fmt.Fprintf(tw, "Number of partitions per thread:\t%d\n", pkNumberPerThread)
fmt.Fprintf(tw, "Test cluster:\t%s\n", testClusterHost)
Expand Down
5 changes: 4 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,10 @@ func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) {
}, nil
}

func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange) (*Stmt, error) {
func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange, deletes bool) (*Stmt, error) {
if !deletes {
return s.GenInsertStmt(t, p)
}
switch n := p.Rand.Intn(1000); n {
case 10, 100:
return s.GenDeleteRows(t, p)
Expand Down

0 comments on commit 3aafe9e

Please sign in to comment.