Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
92991: roachtest: add `failover/non-system/blackhole` tests r=erikgrinaker a=erikgrinaker

This patch adds roachtests to benchmark the maximum unavailability during leaseholder network outages on non-system ranges, both symmetric and asymmetric outages. Initial results, with a query timeout of 30 s:

| Test             | pMax read | pMax write |
|------------------|-----------|------------|
| `crash`          | 14.5 s    | 14.5 s     |
| `blackhole`      | 16.6 s    | 18.3 s     |
| `blackhole-recv` | 30.1 s    | 30.1 s     |
| `blackhole-send` | 30.1 s    | 30.1 s     |

Touches cockroachdb#79494.

Epic: None
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Dec 5, 2022
2 parents 8a5cb51 + 51b7e62 commit 0c4a2e0
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 57 deletions.
269 changes: 212 additions & 57 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,54 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func registerFailover(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "failover/non-system/crash",
Owner: registry.OwnerKV,
Timeout: time.Hour,
Cluster: r.MakeClusterSpec(7, spec.CPU(4)),
Run: runFailoverNonSystemCrash,
})
for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
&failureModeBlackholeSend{},
&failureModeCrash{},
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/non-system/%s", failureMode),
Owner: registry.OwnerKV,
Timeout: 20 * time.Minute,
Cluster: r.MakeClusterSpec(7, spec.CPU(4)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverNonSystem(ctx, t, c, failureMode)
},
})
}
}

// runFailoverNonSystemCrash benchmarks the maximum duration of range
// unavailability following a leaseholder crash with only non-system ranges. It
// tests the simplest possible failure:
//
// - A process crash, where the host/OS remains available (in particular, the
// TCP/IP stack is responsive and sends immediate RST packets to peers).
// runFailoverNonSystem benchmarks the maximum duration of range unavailability
// following a leaseholder failure with only non-system ranges.
//
// - No system ranges located on the crashed node.
// - No system ranges located on the failed node.
//
// - SQL clients do not connect to the crashed node.
// - SQL clients do not connect to the failed node.
//
// - The workload consists of individual point reads and writes.
//
// Since the lease unavailability is probabilistic, depending e.g. on the time
// since the last heartbeat and other variables, we run 9 crashes and record the
// pMax latency to find the upper bound on unavailability. We expect this
// worse-case latency to be slightly larger than the lease interval (9s), to
// since the last heartbeat and other variables, we run 9 failures and record
// the pMax latency to find the upper bound on unavailability. We expect this
// worst-case latency to be slightly larger than the lease interval (9s), to
// account for lease acquisition and retry latencies. We do not assert this, but
// instead export latency histograms for graphing.
//
Expand All @@ -61,19 +70,27 @@ func registerFailover(r registry.Registry) {
// n7: Workload runner.
//
// The test runs a kv50 workload with batch size 1, using 256 concurrent workers
// directed at n1-n3 with a rate of 2048 reqs/s. n4-n6 are killed and restarted
// in order, with 30 seconds between each operation, for 3 cycles totaling 9
// crashes.
func runFailoverNonSystemCrash(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach")

// directed at n1-n3 with a rate of 2048 reqs/s. n4-n6 fail and recover in
// order, with 30 seconds between each operation, for 3 cycles totaling 9
// failures.
func runFailoverNonSystem(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
) {
require.Equal(t, 7, c.Spec().NodeCount)

rng, _ := randutil.NewTestRand()

// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

if f, ok := failureMode.(*failureModeCrash); ok {
f.startOpts = opts
f.startSettings = settings
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

Expand Down Expand Up @@ -112,33 +129,11 @@ func runFailoverNonSystemCrash(ctx context.Context, t test.Test, c cluster.Clust
// n4-n6, so we do it ourselves. Precreating the database/range and moving it
// to the correct nodes first is not sufficient, since workload will spread
// the ranges across all nodes regardless.
relocateRanges := func(predicate string, from, to []int) {
require.NotEmpty(t, predicate)
var count int
for _, source := range from {
where := fmt.Sprintf("%s AND %d = ANY(replicas)", predicate, source)
for {
require.NoError(t, conn.QueryRowContext(ctx,
`SELECT count(*) FROM crdb_internal.ranges WHERE `+where).Scan(&count))
if count == 0 {
break
}
t.Status(fmt.Sprintf("moving %d ranges off of n%d (%s)", count, source, predicate))
for _, target := range to {
_, err = conn.ExecContext(ctx, `ALTER RANGE RELOCATE FROM $1::int TO $2::int FOR `+
`SELECT range_id FROM crdb_internal.ranges WHERE `+where,
source, target)
require.NoError(t, err)
}
time.Sleep(time.Second)
}
}
}
relocateRanges(`database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6})
relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6})

// Start workload on n7, using n1-n3 as gateways. Run it for 10 minutes, since
// we take ~1 minute to kill and restart each node, and we do 3 cycles of
// killing the 3 nodes in order.
// we take ~1 minute to fail and recover each node, and we do 3 cycles of each
// of the 3 nodes in order.
t.Status("running workload")
m := c.NewMonitor(ctx, c.Range(1, 6))
m.Go(func(ctx context.Context) error {
Expand All @@ -149,8 +144,13 @@ func runFailoverNonSystemCrash(ctx context.Context, t test.Test, c cluster.Clust
return nil
})

// Start a worker to kill and restart n4-n6 in order, for 3 cycles.
// Start a worker to fail and recover n4-n6 in order.
defer failureMode.Cleanup(ctx, t, c)

m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
raftCfg.SetDefaults()

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

Expand All @@ -162,25 +162,180 @@ func runFailoverNonSystemCrash(ctx context.Context, t test.Test, c cluster.Clust
return ctx.Err()
}

randTimer := time.After(randutil.RandDuration(rng, raftCfg.RangeLeaseRenewalDuration()))

// Ranges may occasionally escape their constraints. Move them
// to where they should be.
relocateRanges(`database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6})
relocateRanges(`database_name != 'kv'`, []int{node}, []int{1, 2, 3})
relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6})
relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{node}, []int{1, 2, 3})

t.Status(fmt.Sprintf("killing n%d", node))
m.ExpectDeath()
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(node)) // uses SIGKILL
// Randomly sleep up to the lease renewal interval, to vary the time
// between the last lease renewal and the failure. We start the timer
// before the range relocation above to run them concurrently.
select {
case <-randTimer:
case <-ctx.Done():
}

t.Status(fmt.Sprintf("failing n%d (%s)", node, failureMode))
if failureMode.ExpectDeath() {
m.ExpectDeath()
}
failureMode.Fail(ctx, t, c, node)

select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}
t.Status(fmt.Sprintf("restarting n%d", node))
c.Start(ctx, t.L(), opts, settings, c.Node(node))

t.Status(fmt.Sprintf("recovering n%d (%s)", node, failureMode))
failureMode.Recover(ctx, t, c, node)
}
}
return nil
})
m.Wait()
}

// failureMode fails and recovers a given node in some particular way.
type failureMode interface {
fmt.Stringer

// Fail fails the given node.
Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int)

// Recover recovers the given node.
Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int)

// Cleanup cleans up when the test exits. This is needed e.g. when the cluster
// is reused by a different test.
Cleanup(ctx context.Context, t test.Test, c cluster.Cluster)

// ExpectDeath returns true if the node is expected to die on failure.
ExpectDeath() bool
}

// failureModeCrash is a process crash where the TCP/IP stack remains responsive
// and sends immediate RST packets to peers.
type failureModeCrash struct {
startOpts option.StartOpts
startSettings install.ClusterSettings
}

func (f *failureModeCrash) String() string { return "crash" }
func (f *failureModeCrash) ExpectDeath() bool { return true }

func (f *failureModeCrash) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(nodeID)) // uses SIGKILL
}

func (f *failureModeCrash) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Start(ctx, t.L(), f.startOpts, f.startSettings, c.Node(nodeID))
}

func (f *failureModeCrash) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
}

// failureModeBlackhole is a network outage where all inbound and outbound
// TCP/IP packets to/from port 26257 are dropped, causing network hangs and
// timeouts.
type failureModeBlackhole struct{}

func (f *failureModeBlackhole) String() string { return "blackhole" }
func (f *failureModeBlackhole) ExpectDeath() bool { return false }

func (f *failureModeBlackhole) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
}

func (f *failureModeBlackhole) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackhole) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
}

// failureModeBlackholeRecv is an asymmetric network outage where all inbound
// TCP/IP packets to port 26257 are dropped, causing network hangs and timeouts.
// The node can still send traffic on outbound connections.
type failureModeBlackholeRecv struct{}

func (f *failureModeBlackholeRecv) String() string { return "blackhole-recv" }
func (f *failureModeBlackholeRecv) ExpectDeath() bool { return false }

func (f *failureModeBlackholeRecv) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
}

func (f *failureModeBlackholeRecv) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackholeRecv) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
}

// failureModeBlackholeSend is an asymmetric network outage where all outbound
// TCP/IP packets to port 26257 are dropped, causing network hangs and
// timeouts. The node can still receive traffic on inbound connections.
type failureModeBlackholeSend struct{}

func (f *failureModeBlackholeSend) String() string { return "blackhole-send" }
func (f *failureModeBlackholeSend) ExpectDeath() bool { return false }

func (f *failureModeBlackholeSend) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
}

func (f *failureModeBlackholeSend) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackholeSend) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
}

// relocateRanges relocates all ranges matching the given predicate from a set
// of nodes to a different set of nodes. Moves are attempted sequentially from
// each source onto each target, and errors are retried indefinitely.
func relocateRanges(
t test.Test, ctx context.Context, conn *gosql.DB, predicate string, from, to []int,
) {
require.NotEmpty(t, predicate)
var count int
for _, source := range from {
where := fmt.Sprintf("%s AND %d = ANY(replicas)", predicate, source)
for {
require.NoError(t, conn.QueryRowContext(ctx,
`SELECT count(*) FROM crdb_internal.ranges WHERE `+where).Scan(&count))
if count == 0 {
break
}
t.Status(fmt.Sprintf("moving %d ranges off of n%d (%s)", count, source, predicate))
for _, target := range to {
_, err := conn.ExecContext(ctx, `ALTER RANGE RELOCATE FROM $1::int TO $2::int FOR `+
`SELECT range_id FROM crdb_internal.ranges WHERE `+where,
source, target)
require.NoError(t, err)
}
time.Sleep(time.Second)
}
}
}
6 changes: 6 additions & 0 deletions pkg/util/randutil/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"runtime"
"strings"
"time"
_ "unsafe" // required by go:linkname

"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -123,6 +124,11 @@ func RandUint64n(r *rand.Rand, n uint64) uint64 {
return v
}

// RandDuration returns a random duration in [0, max).
func RandDuration(r *rand.Rand, max time.Duration) time.Duration {
return time.Duration(r.Int63n(int64(max)))
}

var randLetters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// RandBytes returns a byte slice of the given length with random
Expand Down

0 comments on commit 0c4a2e0

Please sign in to comment.