Skip to content

Commit

Permalink
Merge #26609
Browse files Browse the repository at this point in the history
26609: workload: add "gentle" chaos to tpccbench r=m-schneider a=m-schneider

Before we used the roachprod stop command to stop cockroach for our
chaos scenarios, which is a kill -9. Now for chaos we'll have an option
to do a gracefull drain of the node.

Closes #26387

Release note: None

Co-authored-by: Masha Schneider <[email protected]>
  • Loading branch information
craig[bot] and Masha Schneider committed Jun 20, 2018
2 parents 2e457ee + b7a3585 commit dc59fc8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
26 changes: 18 additions & 8 deletions pkg/cmd/roachtest/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type ChaosTimer interface {

// Periodic is a chaos timing using fixed durations.
type Periodic struct {
Down, Up time.Duration
Period, DownTime time.Duration
}

// Timing implements ChaosTimer.
func (p Periodic) Timing() (time.Duration, time.Duration) {
return p.Down, p.Up
return p.Period, p.DownTime
}

// Chaos stops and restarts nodes in a cluster.
Expand All @@ -45,6 +45,9 @@ type Chaos struct {
// Stopper is a channel that the chaos agent listens on. The agent will
// terminate cleanly once it receives on the channel.
Stopper <-chan time.Time
// DrainAndQuit is used to determine if want to kill the node vs draining it
// first and shutting down gracefully.
DrainAndQuit bool
}

// Runner returns a closure that runs chaos against the given cluster without
Expand All @@ -56,30 +59,37 @@ func (ch *Chaos) Runner(c *cluster, m *monitor) func(context.Context) error {
if err != nil {
return err
}
period, downTime := ch.Timer.Timing()
t := time.NewTicker(period)
for {
before, between := ch.Timer.Timing()
select {
case <-ch.Stopper:
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(before):
case <-t.C:
}

target := ch.Target()
l.printf("killing %v (slept %s)\n", target, before)
m.ExpectDeath()
c.Stop(ctx, target)

if ch.DrainAndQuit {
l.printf("stopping and draining %v\n", target)
c.Stop(ctx, target, stopArgs("--sig=15"))
} else {
l.printf("killing %v\n", target)
c.Stop(ctx, target)
}

select {
case <-ch.Stopper:
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(between):
case <-time.After(downTime):
}

c.l.printf("restarting %v after %s of downtime\n", target, between)
c.l.printf("restarting %v after %s of downtime\n", target, downTime)
c.Start(ctx, target)
}
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,11 @@ func startArgs(extraArgs ...string) option {
return roachprodArgOption(extraArgs)
}

// stopArgs specifies extra arguments that are passed to `roachprod` during `c.Stop`.
func stopArgs(extraArgs ...string) option {
return roachprodArgOption(extraArgs)
}

type roachprodArgOption []string

func (o roachprodArgOption) option() {}
Expand Down Expand Up @@ -762,12 +767,19 @@ func (c *cluster) Stop(ctx context.Context, opts ...option) {
// If the test has failed, don't try to limp along.
return
}

args := []string{
roachprod,
"stop",
}
args = append(args, roachprodArgs(opts)...)
args = append(args, c.makeNodes(opts...))
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
c.status("stopping cluster")
defer c.status()
err := execCmd(ctx, c.l, roachprod, "stop", c.makeNodes(opts...))
err := execCmd(ctx, c.l, args...)
if err != nil {
c.t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/scaledata.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func runSqlapp(ctx context.Context, t *test, c *cluster, app, flags string, dur
// Kill one node at a time, with a minute of healthy cluster and thirty
// seconds of down node.
ch := Chaos{
Timer: Periodic{Down: 30 * time.Second, Up: 1 * time.Minute},
Timer: Periodic{Period: 90 * time.Second, DownTime: 30 * time.Second},
Target: roachNodes.randNode,
Stopper: time.After(dur),
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -256,7 +257,7 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
// Search between 1 and b.LoadWarehouses for the largest number of
// warehouses that can be operated on while sustaining a throughput
// threshold, set to a fraction of max tpmC.
precision := b.LoadWarehouses / 200
precision := int(math.Max(1.0, float64(b.LoadWarehouses/200)))
initStepSize := precision
s := search.NewLineSearcher(1, b.LoadWarehouses, b.EstimatedMax, initStepSize, precision)
res, err := s.Search(func(warehouses int) (bool, error) {
Expand All @@ -280,9 +281,10 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {

// Kill one node at a time.
ch := Chaos{
Timer: Periodic{Down: 1 * time.Second, Up: 90 * time.Second},
Target: roachNodes.randNode,
Stopper: loadDone,
Timer: Periodic{Period: 90 * time.Second, DownTime: 1 * time.Second},
Target: roachNodes.randNode,
Stopper: loadDone,
DrainAndQuit: true,
}
m.Go(ch.Runner(c, m))
}
Expand Down

0 comments on commit dc59fc8

Please sign in to comment.