Skip to content

Commit

Permalink
roachtest: add replica-circuit-breakers
Browse files Browse the repository at this point in the history
This adds a roachtest that runs TPCC (with a low warehouse count and
`--tolerate-errors`), loses quorum half-way through, and checks the
prometheus metrics of the workload for fail-fast behavior. It also
checks that follower reads and bounded staleness reads work as
expected at timestamps at which the cluster was known to have been
healthy.

Touches cockroachdb#33007.

Release note: None
  • Loading branch information
tbg committed Apr 12, 2022
1 parent 36c3799 commit 9cefd3e
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_library(
"rebalance_load.go",
"registry.go",
"remove_invalid_database_privileges.go",
"replica_circuit_breaker.go",
"replicagc.go",
"reset_quorum.go",
"restart.go",
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ func (ch *Chaos) Runner(
l.Printf("restarting %v (chaos is done)\n", target)
startOpts := option.DefaultStartOpts()
startOpts.RoachtestOpts.Worker = true
ch.sendEvent(ChaosEventTypePreStartup, target)
if err := c.StartE(ctx, l, startOpts, install.MakeClusterSettings(), target); err != nil {
return errors.Wrapf(err, "could not restart node %s", target)
}
ch.sendEvent(ChaosEventTypeStartupComplete, target)
return nil
case <-ctx.Done():
// NB: the roachtest harness checks that at the end of the test,
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func RegisterTests(r registry.Registry) {
registerQuitAllNodes(r)
registerQuitTransfersLeases(r)
registerRebalanceLoad(r)
registerReplicaCircuitBreaker(r)
registerReplicaGC(r)
registerRestart(r)
registerRestoreNodeShutdown(r)
Expand Down
283 changes: 283 additions & 0 deletions pkg/cmd/roachtest/tests/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/prometheus"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func registerReplicaCircuitBreaker(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "replica-circuit-breaker",
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(6),
Run: runReplicaCircuitBreaker,
})
}

func runReplicaCircuitBreaker(ctx context.Context, t test.Test, c cluster.Cluster) {
const (
dur = 10 * time.Minute
loseQuorumAfter = dur / 2
warehouses = 50 // NB: if this ever changes, qps counts might need adjustment
minHealthyQPS = 5.0 // min successes per second before quorum loss
minUnhealthyQPS = 5.0 // min (request+errors) per second after quorum lost for a while
)

// Run TPCC and half-way through lose two nodes, which is likely to render
// some of the TPCC ranges (but not system ranges, which are 5x replicated)
// unavailable. We let TPCC tolerate errors. When it is done, we look at the
// time series and verify
//
// a) workload was healthy before outage, as measured by min qps.
// b) workload made "progress" during the outage (after a transition period), as
// measured by min qps.
//
// At the time of writing, we see ~11 qps during a) and ~15-20 qps during b).
//
// At select points in the test we also verify bounded staleness and vanilla follower reads.
chaosEventCh := make(chan ChaosEvent)
opts := tpccOptions{
Warehouses: warehouses,
ExtraRunArgs: "--tolerate-errors",
ExtraSetupArgs: "",
Chaos: func() Chaos {
return Chaos{
Timer: Periodic{
Period: loseQuorumAfter,
DownTime: 99999 * time.Hour, // stay down forever; node restarts when chaos winds down
},
Target: func() option.NodeListOption {
// Stopping two nodes is very likely to lose quorum on at least one TPCC
// range, while keeping the system ranges (which are 5x replicated)
// healthy.
//
// NB: we spare n1 since it's special cased by the tpcc checks that run
// after completion of the workload.
return c.Nodes(2, 3)
},
Stopper: time.After(dur),
ChaosEventCh: chaosEventCh,
}
},
Duration: dur,
RampDuration: time.Nanosecond, // effectively disable
EnableCircuitBreakers: true,
}
firstGoodCh := make(chan time.Time, 1)
lastGoodCh := make(chan time.Time, 1)
downCh := make(chan time.Time, 1)
upCh := make(chan time.Time, 1)

checkFollowerReadsCh := make(chan time.Time, 100)

// We make a monitor for the test goroutine that consumes chaos events and runs one-off checks.
// Note that `runTPCC` uses its own monitor internally.
m := c.NewMonitor(ctx, option.NoNodes{})
m.Go(func(ctx context.Context) error {
for ev := range chaosEventCh {
t.L().Printf("%+v", ev)
switch ev.Type {
case ChaosEventTypeStart:
maybeSendTime(checkFollowerReadsCh, ev.Time)
maybeSendTime(firstGoodCh, ev.Time)
case ChaosEventTypePreShutdown:
maybeSendTime(checkFollowerReadsCh, ev.Time)
maybeSendTime(lastGoodCh, ev.Time)
case ChaosEventTypeShutdownComplete:
maybeSendTime(checkFollowerReadsCh, time.Time{})
maybeSendTime(downCh, ev.Time)
case ChaosEventTypePreStartup:
maybeSendTime(checkFollowerReadsCh, time.Time{})
case ChaosEventTypeStartupComplete:
maybeSendTime(upCh, ev.Time)
case ChaosEventTypeEnd:
maybeSendTime(checkFollowerReadsCh, time.Time{})
close(checkFollowerReadsCh)
return nil
}
}
return errors.New("chaos stopping") // should not be hit
})
m.Go(func(ctx context.Context) error {
var tss []time.Time
// Run an expanding set of checks. The channel getting a zero time is the signal
// to only run checks on all timestamps seen so far.
for ts := range checkFollowerReadsCh {
if !ts.IsZero() {
tss = append(tss, ts)
}
for _, ts := range tss {
r := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), 1))
queryTS := ts.Add(-10 * time.Second).Format("2006-01-02 15:04:05")
q1 := `SELECT count(*) FROM tpcc.order_line AS OF SYSTEM TIME '` + queryTS + `'`
q2 := `SELECT * FROM tpcc.order_line AS OF SYSTEM TIME with_min_timestamp('` + queryTS + `', true)
WHERE ol_w_id = 1 AND ol_d_id = 1 AND ol_o_id = 1 AND ol_number = 1`
r.QueryStr(t, q1)
t.L().Printf("ok: %s", q1)
r.QueryStr(t, q2)
t.L().Printf("ok: %s", q2)
}
}
return nil
})
m.Go(func(ctx context.Context) error {
runTPCC(ctx, t, c, opts)
return nil
})
m.Wait()

// The test has finished, so let's examine the metrics.
firstGood, lastGood, down, up :=
maybeRecvTime(ctx, firstGoodCh), maybeRecvTime(ctx, lastGoodCh), maybeRecvTime(ctx, downCh), maybeRecvTime(ctx, upCh)
require.NotZero(t, firstGood)
require.NotZero(t, lastGood)
require.NotZero(t, down)
require.NotZero(t, up)

// Quorum loss just occurred. Depending on luck reads may succeed for a few more seconds
// until the lease times out, but either way now's a good time to start some additional checks.
// Concretely, follower reads should work, and so should bounded staleness reads.
t.L().Printf("cluster healthy from %s to %s, quorum loss at %s, restored at %s", firstGood, lastGood, down, up)

promAddr := func() string {
sl, err := c.ExternalIP(ctx, t.L(), c.Node(c.Spec().NodeCount))
require.NoError(t, err)
require.Len(t, sl, 1)
return fmt.Sprintf("http://%s:9090", sl[0])
}()

{
// No errors before chaos kicked in.
const q = `rate(workload_tpcc_newOrder_error_total[1m])`
visitPromRangeQuery(
ctx, t, promAddr, q, firstGood, lastGood,
func(ts time.Time, val model.SamplePair) {
if val.Value > 0 {
t.Errorf("at %s: nonzero TPCC error rate while cluster should have been healthy: %.2f", ts, val.Value)
}
},
)
}

{
// At least some throughput before chaos kicked in.
const q = `rate(workload_tpcc_newOrder_success_total[1m])`
min, _ := visitPromRangeQuery(
// NB: we intentionally check only at the very end, right before chaos
// strikes. The workload tends to ramp up (even with ramp disabled),
// possibly due to lease movement, etc.
ctx, t, promAddr, q, lastGood, lastGood,
func(ts time.Time, val model.SamplePair) {
if val.Value < minHealthyQPS {
t.Errorf("at %s: very low qps before chaos kicked in: %.2f", ts, val.Value)
}
},
)
t.L().Printf("under healthy cluster: ~%.2f qps", min)
}

{
// Starting at one minute after quorum was lost, we should see at least a
// steady trickle of errors, or a steady trickle of successes. (If
// miraculously no range lost quorum it would be all successes, if they all
// lost quorum it would be just errors).
const q = `rate(workload_tpcc_newOrder_error_total[1m])+rate(workload_tpcc_newOrder_success_total[1m])`
min, max := visitPromRangeQuery(
ctx, t, promAddr, q, down.Add(time.Minute), up,
func(ts time.Time, val model.SamplePair) {
if val.Value < minUnhealthyQPS {
t.Errorf("at %s: unexpectedly low combined qps rate: %.2f", ts, val.Value)
}
},
)
t.L().Printf("under unhealthy cluster: [%.2f, %.2f] (combined error+success) qps", min, max)
}
}

func visitPromRangeQuery(
ctx context.Context,
t require.TestingT,
promAddr string,
query string,
from, to time.Time,
visit func(time.Time, model.SamplePair),
) (_min, _max float64) {
// Stay away from the edges of the interval, to avoid unexpected outcomes
// due to prometheus interpolation, etc.
from = from.Add(prometheus.DefaultScrapeInterval)
to = to.Add(-prometheus.DefaultScrapeInterval)
if from.After(to) {
from = to
}

client, err := promapi.NewClient(promapi.Config{
Address: promAddr,
})
require.NoError(t, err)
promClient := promv1.NewAPI(client)
mtr, ws, err := promClient.QueryRange(ctx, query, promv1.Range{Start: from, End: to, Step: 15 * time.Second})
if err != nil {
t.Errorf("%s", err)
t.FailNow()
}
if len(ws) != 0 {
t.Errorf("warnings: %v", ws)
}
require.Len(t, mtr.(model.Matrix), 1, "multiple time series found: %v", mtr)

var min, max float64
var n int
for _, val := range mtr.(model.Matrix)[0].Values {
n++
visit(timeutil.Unix(int64(val.Timestamp/1000), 0), val)
v := float64(val.Value)
if n == 1 || min > v {
min = v
}
if n == 1 || max < v {
max = v
}
}
require.NotZero(t, n, "no data returned for %s on %s-%s", query, from, to)
return min, max
}

func maybeSendTime(c chan time.Time, t time.Time) {
select {
case c <- t:
default:
}
}

func maybeRecvTime(ctx context.Context, c chan time.Time) time.Time {
select {
case t := <-c:
return t
case <-ctx.Done():
return time.Time{}
}
}
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type tpccOptions struct {
Chaos func() Chaos // for late binding of stopper
During func(context.Context) error // for running a function during the test
Duration time.Duration // if zero, TPCC is not invoked
RampDuration time.Duration // defaults to 5m
SetupType tpccSetupType
// PrometheusConfig, if set, overwrites the default prometheus config settings.
PrometheusConfig *prometheus.Config
Expand Down Expand Up @@ -213,6 +214,9 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
}

rampDuration := 5 * time.Minute
if opts.RampDuration != 0 {
rampDuration = opts.RampDuration
}
if c.IsLocal() {
opts.Warehouses = 1
if opts.Duration > time.Minute {
Expand Down

0 comments on commit 9cefd3e

Please sign in to comment.