Skip to content

Commit

Permalink
roachtest: deflake tpccbench chaos tests
Browse files Browse the repository at this point in the history
These tests were running a chaos agent across cluster restarts. Whenever
a cluster restart would overlap with the chaos agent restarting a node,
one of the two operations would fail and jam the test.

Fixes #39005.

Release justification: de-flakes a roachtest without changes to the
release binary.

Release note: None
  • Loading branch information
tbg committed Sep 23, 2019
1 parent a92c7d0 commit 8abea21
Showing 1 changed file with 145 additions and 150 deletions.
295 changes: 145 additions & 150 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/ttycolor"
"github.com/lib/pq"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type tpccOptions struct {
Expand Down Expand Up @@ -714,168 +713,164 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
c.Put(ctx, workload, "./workload", loadNodes)
c.Start(ctx, t, append(b.startOpts(), roachNodes)...)

// Wait after restarting nodes before applying load. This lets
// things settle down to avoid unexpected cluster states.
const restartWait = 15 * time.Second
time.Sleep(restartWait)

useHAProxy := b.Chaos
if useHAProxy {
if len(loadNodes) > 1 {
t.Fatal("distributed chaos benchmarking not supported")
}
t.Status("installing haproxy")
if err := c.Install(ctx, t.l, loadNodes, "haproxy"); err != nil {
t.Fatal(err)
const restartWait = 15 * time.Second
{
// Wait after restarting nodes before applying load. This lets
// things settle down to avoid unexpected cluster states.
time.Sleep(restartWait)
if useHAProxy {
if len(loadNodes) > 1 {
t.Fatal("distributed chaos benchmarking not supported")
}
t.Status("installing haproxy")
if err := c.Install(ctx, t.l, loadNodes, "haproxy"); err != nil {
t.Fatal(err)
}
c.Run(ctx, loadNodes, "./cockroach gen haproxy --insecure --url {pgurl:1}")
c.Run(ctx, loadNodes, "haproxy -f haproxy.cfg -D")
}
c.Run(ctx, loadNodes, "./cockroach gen haproxy --insecure --url {pgurl:1}")
c.Run(ctx, loadNodes, "haproxy -f haproxy.cfg -D")
}

m := newMonitor(ctx, c, roachNodes)
m.Go(func(ctx context.Context) error {
t.Status("setting up dataset")
err := loadTPCCBench(ctx, t, c, b, roachNodes, c.Node(loadNodes[0]))
if err != nil {
return err
}
m := newMonitor(ctx, c, roachNodes)
m.Go(func(ctx context.Context) error {
t.Status("setting up dataset")
return loadTPCCBench(ctx, t, c, b, roachNodes, c.Node(loadNodes[0]))
})
m.Wait()
}

// Search between 1 and b.LoadWarehouses for the largest number of
// warehouses that can be operated on while sustaining a throughput
// threshold, set to a fraction of max tpmC.
precision := int(math.Max(1.0, float64(b.LoadWarehouses/200)))
initStepSize := precision
// Search between 1 and b.LoadWarehouses for the largest number of
// warehouses that can be operated on while sustaining a throughput
// threshold, set to a fraction of max tpmC.
precision := int(math.Max(1.0, float64(b.LoadWarehouses/200)))
initStepSize := precision

// Create a temp directory to store the local copy of results from the
// workloads.
resultsDir, err := ioutil.TempDir("", "roachtest-tpcc")
if err != nil {
return errors.Wrap(err, "failed to create temp dir")
}
defer func() { _ = os.RemoveAll(resultsDir) }()
s := search.NewLineSearcher(1, b.LoadWarehouses, b.EstimatedMax, initStepSize, precision)
res, err := s.Search(func(warehouses int) (bool, error) {
// Restart the cluster before each iteration to help eliminate
// inter-trial interactions.
m.ExpectDeaths(int32(len(roachNodes)))
c.Stop(ctx, roachNodes)
c.Start(ctx, t, append(b.startOpts(), roachNodes)...)
time.Sleep(restartWait)

// Set up the load generation configuration.
rampDur := 5 * time.Minute
loadDur := 10 * time.Minute
loadDone := make(chan time.Time, numLoadGroups)

// If we're running chaos in this configuration, modify this config.
if b.Chaos {
// Increase the load generation duration.
loadDur = 10 * time.Minute

// Kill one node at a time.
ch := Chaos{
Timer: Periodic{Period: 90 * time.Second, DownTime: 5 * time.Second},
Target: roachNodes.randNode,
Stopper: loadDone,
}
m.Go(ch.Runner(c, m))
}
if b.Distribution == multiRegion {
rampDur = 3 * time.Minute
loadDur = 15 * time.Minute
// Create a temp directory to store the local copy of results from the
// workloads.
resultsDir, err := ioutil.TempDir("", "roachtest-tpcc")
if err != nil {
t.Fatal(errors.Wrap(err, "failed to create temp dir"))
}
defer func() { _ = os.RemoveAll(resultsDir) }()
s := search.NewLineSearcher(1, b.LoadWarehouses, b.EstimatedMax, initStepSize, precision)
if res, err := s.Search(func(warehouses int) (bool, error) {
m := newMonitor(ctx, c, roachNodes)
// Restart the cluster before each iteration to help eliminate
// inter-trial interactions.
m.ExpectDeaths(int32(len(roachNodes)))
c.Stop(ctx, roachNodes)
c.Start(ctx, t, append(b.startOpts(), roachNodes)...)
time.Sleep(restartWait)

// Set up the load generation configuration.
rampDur := 5 * time.Minute
loadDur := 10 * time.Minute
loadDone := make(chan time.Time, numLoadGroups)

// If we're running chaos in this configuration, modify this config.
if b.Chaos {
// Increase the load generation duration.
loadDur = 10 * time.Minute

// Kill one node at a time.
ch := Chaos{
Timer: Periodic{Period: 90 * time.Second, DownTime: 5 * time.Second},
Target: roachNodes.randNode,
Stopper: loadDone,
}
m.Go(ch.Runner(c, m))
}
if b.Distribution == multiRegion {
rampDur = 3 * time.Minute
loadDur = 15 * time.Minute
}

// If we're running multiple load generators, run them in parallel and then
// aggregate resultChan. In order to process the results we need to copy
// over the histograms. Create a temp dir which will contain the fetched
// data.
var eg errgroup.Group
resultChan := make(chan *tpcc.Result, numLoadGroups)
for groupIdx, group := range loadGroups {
// Copy for goroutine
groupIdx := groupIdx
group := group
eg.Go(func() error {
sqlGateways := group.roachNodes
if useHAProxy {
sqlGateways = group.loadNodes
}
// If we're running multiple load generators, run them in parallel and then
// aggregate resultChan. In order to process the results we need to copy
// over the histograms. Create a temp dir which will contain the fetched
// data.
resultChan := make(chan *tpcc.Result, numLoadGroups)
for groupIdx, group := range loadGroups {
// Copy for goroutine
groupIdx := groupIdx
group := group
m.Go(func(ctx context.Context) error {
sqlGateways := group.roachNodes
if useHAProxy {
sqlGateways = group.loadNodes
}

extraFlags := ""
activeWarehouses := warehouses
switch b.LoadConfig {
case singleLoadgen:
// Nothing.
case singlePartitionedLoadgen:
extraFlags = fmt.Sprintf(` --partitions=%d`, b.partitions())
case multiLoadgen:
extraFlags = fmt.Sprintf(` --partitions=%d --partition-affinity=%d`,
b.partitions(), groupIdx)
activeWarehouses = warehouses / numLoadGroups
default:
panic("unexpected")
}
extraFlags := ""
activeWarehouses := warehouses
switch b.LoadConfig {
case singleLoadgen:
// Nothing.
case singlePartitionedLoadgen:
extraFlags = fmt.Sprintf(` --partitions=%d`, b.partitions())
case multiLoadgen:
extraFlags = fmt.Sprintf(` --partitions=%d --partition-affinity=%d`,
b.partitions(), groupIdx)
activeWarehouses = warehouses / numLoadGroups
default:
panic("unexpected")
}

t.Status(fmt.Sprintf("running benchmark, warehouses=%d", warehouses))
histogramsPath := fmt.Sprintf("%s/warehouses=%d/stats.json", perfArtifactsDir, activeWarehouses)
cmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --active-warehouses=%d "+
"--tolerate-errors --scatter --ramp=%s --duration=%s%s {pgurl%s} "+
"--histograms=%s",
b.LoadWarehouses, activeWarehouses, rampDur,
loadDur, extraFlags, sqlGateways, histogramsPath)
err := c.RunE(ctx, group.loadNodes, cmd)
loadDone <- timeutil.Now()
if err != nil {
return errors.Wrapf(err, "error running tpcc load generator")
}
roachtestHistogramsPath := filepath.Join(resultsDir, fmt.Sprintf("%d.%d-stats.json", warehouses, groupIdx))
if err := c.Get(
ctx, t.l, histogramsPath, roachtestHistogramsPath, group.loadNodes,
); err != nil {
t.Fatal(err)
}
snapshots, err := histogram.DecodeSnapshots(roachtestHistogramsPath)
if err != nil {
return errors.Wrapf(err, "failed to decode histogram snapshots")
}
result := tpcc.NewResultWithSnapshots(activeWarehouses, 0, snapshots)
resultChan <- result
return nil
})
}
if err = eg.Wait(); err != nil {
return false, err
}
close(resultChan)
var results []*tpcc.Result
for partial := range resultChan {
results = append(results, partial)
}
res := tpcc.MergeResults(results...)
failErr := res.FailureError()
// Print the result.
if failErr == nil {
ttycolor.Stdout(ttycolor.Green)
t.l.Printf("--- PASS: tpcc %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n",
warehouses, res.TpmC(), res.Efficiency())
} else {
ttycolor.Stdout(ttycolor.Red)
t.l.Printf("--- FAIL: tpcc %d resulted in %.1f tpmC and failed due to %v",
warehouses, res.TpmC(), failErr)
}
ttycolor.Stdout(ttycolor.Reset)
return failErr == nil, nil
})
if err != nil {
return err
t.Status(fmt.Sprintf("running benchmark, warehouses=%d", warehouses))
histogramsPath := fmt.Sprintf("%s/warehouses=%d/stats.json", perfArtifactsDir, activeWarehouses)
cmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --active-warehouses=%d "+
"--tolerate-errors --scatter --ramp=%s --duration=%s%s {pgurl%s} "+
"--histograms=%s",
b.LoadWarehouses, activeWarehouses, rampDur,
loadDur, extraFlags, sqlGateways, histogramsPath)
err := c.RunE(ctx, group.loadNodes, cmd)
loadDone <- timeutil.Now()
if err != nil {
return errors.Wrapf(err, "error running tpcc load generator")
}
roachtestHistogramsPath := filepath.Join(resultsDir, fmt.Sprintf("%d.%d-stats.json", warehouses, groupIdx))
if err := c.Get(
ctx, t.l, histogramsPath, roachtestHistogramsPath, group.loadNodes,
); err != nil {
t.Fatal(err)
}
snapshots, err := histogram.DecodeSnapshots(roachtestHistogramsPath)
if err != nil {
return errors.Wrapf(err, "failed to decode histogram snapshots")
}
result := tpcc.NewResultWithSnapshots(activeWarehouses, 0, snapshots)
resultChan <- result
return nil
})
}

if err = m.WaitE(); err != nil {
return false, err
}
close(resultChan)
var results []*tpcc.Result
for partial := range resultChan {
results = append(results, partial)
}
res := tpcc.MergeResults(results...)
failErr := res.FailureError()
// Print the result.
if failErr == nil {
ttycolor.Stdout(ttycolor.Green)
t.l.Printf("--- PASS: tpcc %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n",
warehouses, res.TpmC(), res.Efficiency())
} else {
ttycolor.Stdout(ttycolor.Red)
t.l.Printf("--- FAIL: tpcc %d resulted in %.1f tpmC and failed due to %v",
warehouses, res.TpmC(), failErr)
}
ttycolor.Stdout(ttycolor.Reset)
return failErr == nil, nil
}); err != nil {
t.Fatal(err)
} else {
ttycolor.Stdout(ttycolor.Green)
t.l.Printf("------\nMAX WAREHOUSES = %d\n------\n\n", res)
ttycolor.Stdout(ttycolor.Reset)
return nil
})
m.Wait()
}
}

func registerTPCCBench(r *testRegistry) {
Expand Down

0 comments on commit 8abea21

Please sign in to comment.