Skip to content

Commit

Permalink
Merge #94240
Browse files Browse the repository at this point in the history
94240: roachtest: add `disk-stall` variant of `failover` tests r=erikgrinaker a=erikgrinaker

**roachtest: reorganize failer interface**

This patch reorganizes the `failer` interface with `Setup`, `Ready`, and `Cleanup` hooks, providing it with access to the monitor. It also passes the test and cluster references via the constructor.

Epic: none
Release note: None
  
**roachtest: add `disk-stall` variant of `failover` tests**

This patch adds `disk-stall` variants of the `failover` tests, which benchmarks the pMax unavailability when a leaseholder experiences a persistent disk stall.

Resolves #94231.
Touches #81100.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Dec 28, 2022
2 parents 76e3ea8 + 42e6639 commit 6f675b0
Showing 1 changed file with 158 additions and 67 deletions.
225 changes: 158 additions & 67 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func registerFailover(r registry.Registry) {
failureModeBlackholeRecv,
failureModeBlackholeSend,
failureModeCrash,
failureModeDiskStall,
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Expand Down Expand Up @@ -106,7 +107,11 @@ func runFailoverNonSystem(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
defer failer.Cleanup(ctx)

c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

Expand Down Expand Up @@ -164,8 +169,7 @@ func runFailoverNonSystem(
})

// Start a worker to fail and recover n4-n6 in order.
defer failer.Cleanup(ctx, t, c)

failer.Ready(ctx, m)
m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
Expand Down Expand Up @@ -197,10 +201,7 @@ func runFailoverNonSystem(
}

t.Status(fmt.Sprintf("failing n%d (%s)", node, failureMode))
if failer.ExpectDeath() {
m.ExpectDeath()
}
failer.Fail(ctx, t, c, node)
failer.Fail(ctx, node)

select {
case <-ticker.C:
Expand All @@ -209,7 +210,7 @@ func runFailoverNonSystem(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", node, failureMode))
failer.Recover(ctx, t, c, node)
failer.Recover(ctx, node)
}
}
return nil
Expand Down Expand Up @@ -260,7 +261,11 @@ func runFailoverLiveness(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
defer failer.Cleanup(ctx)

c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 4))

Expand Down Expand Up @@ -344,8 +349,7 @@ func runFailoverLiveness(
startTime := timeutil.Now()

// Start a worker to fail and recover n4.
defer failer.Cleanup(ctx, t, c)

failer.Ready(ctx, m)
m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
Expand Down Expand Up @@ -376,10 +380,7 @@ func runFailoverLiveness(
}

t.Status(fmt.Sprintf("failing n%d (%s)", 4, failureMode))
if failer.ExpectDeath() {
m.ExpectDeath()
}
failer.Fail(ctx, t, c, 4)
failer.Fail(ctx, 4)

select {
case <-ticker.C:
Expand All @@ -388,7 +389,7 @@ func runFailoverLiveness(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", 4, failureMode))
failer.Recover(ctx, t, c, 4)
failer.Recover(ctx, 4)
relocateLeases(t, ctx, conn, `range_id = 2`, 4)
}
return nil
Expand Down Expand Up @@ -459,7 +460,11 @@ func runFailoverSystemNonLiveness(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)

failer := makeFailer(t, c, failureMode, opts, settings)
failer.Setup(ctx)
defer failer.Cleanup(ctx)

c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

Expand Down Expand Up @@ -525,8 +530,7 @@ func runFailoverSystemNonLiveness(
})

// Start a worker to fail and recover n4-n6 in order.
defer failer.Cleanup(ctx, t, c)

failer.Ready(ctx, m)
m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
Expand Down Expand Up @@ -560,10 +564,7 @@ func runFailoverSystemNonLiveness(
}

t.Status(fmt.Sprintf("failing n%d (%s)", node, failureMode))
if failer.ExpectDeath() {
m.ExpectDeath()
}
failer.Fail(ctx, t, c, node)
failer.Fail(ctx, node)

select {
case <-ticker.C:
Expand All @@ -572,7 +573,7 @@ func runFailoverSystemNonLiveness(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", node, failureMode))
failer.Recover(ctx, t, c, node)
failer.Recover(ctx, node)
}
}
return nil
Expand All @@ -584,35 +585,55 @@ func runFailoverSystemNonLiveness(
type failureMode string

const (
failureModeCrash failureMode = "crash"
failureModeBlackhole failureMode = "blackhole"
failureModeBlackholeRecv failureMode = "blackhole-recv"
failureModeBlackholeSend failureMode = "blackhole-send"
failureModeCrash failureMode = "crash"
failureModeDiskStall failureMode = "disk-stall"
)

// makeFailer creates a new failer for the given failureMode.
func makeFailer(
t test.Test, failureMode failureMode, opts option.StartOpts, settings install.ClusterSettings,
t test.Test,
c cluster.Cluster,
failureMode failureMode,
opts option.StartOpts,
settings install.ClusterSettings,
) failer {
switch failureMode {
case failureModeCrash:
return &crashFailer{
startOpts: opts,
startSettings: settings,
}
case failureModeBlackhole:
return &blackholeFailer{
t: t,
c: c,
input: true,
output: true,
}
case failureModeBlackholeRecv:
return &blackholeFailer{
t: t,
c: c,
input: true,
}
case failureModeBlackholeSend:
return &blackholeFailer{
t: t,
c: c,
output: true,
}
case failureModeCrash:
return &crashFailer{
t: t,
c: c,
startOpts: opts,
startSettings: settings,
}
case failureModeDiskStall:
return &diskStallFailer{
t: t,
c: c,
startOpts: opts,
startSettings: settings,
}
default:
t.Fatalf("unknown failure mode %s", failureMode)
return nil
Expand All @@ -621,38 +642,21 @@ func makeFailer(

// failer fails and recovers a given node in some particular way.
type failer interface {
// Fail fails the given node.
Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int)
// Setup prepares the failer. It is called before the cluster is started.
Setup(ctx context.Context)

// Recover recovers the given node.
Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int)
// Ready is called when the cluster is ready, with a running workload.
Ready(ctx context.Context, m cluster.Monitor)

// Cleanup cleans up when the test exits. This is needed e.g. when the cluster
// is reused by a different test.
Cleanup(ctx context.Context, t test.Test, c cluster.Cluster)

// ExpectDeath returns true if the node is expected to die on failure.
ExpectDeath() bool
}

// crashFailer is a process crash where the TCP/IP stack remains responsive
// and sends immediate RST packets to peers.
type crashFailer struct {
startOpts option.StartOpts
startSettings install.ClusterSettings
}

func (f *crashFailer) ExpectDeath() bool { return true }
Cleanup(ctx context.Context)

func (f *crashFailer) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(nodeID)) // uses SIGKILL
}

func (f *crashFailer) Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Start(ctx, t.L(), f.startOpts, f.startSettings, c.Node(nodeID))
}
// Fail fails the given node.
Fail(ctx context.Context, nodeID int)

func (f *crashFailer) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
// Recover recovers the given node.
Recover(ctx context.Context, nodeID int)
}

// blackholeFailer causes a network failure where TCP/IP packets to/from port
Expand All @@ -662,13 +666,20 @@ func (f *crashFailer) Cleanup(ctx context.Context, t test.Test, c cluster.Cluste
// will fail (even already established connections), but connections in the
// other direction are still functional (including responses).
type blackholeFailer struct {
t test.Test
c cluster.Cluster
input bool
output bool
}

func (f *blackholeFailer) ExpectDeath() bool { return false }
func (f *blackholeFailer) Setup(ctx context.Context) {}
func (f *blackholeFailer) Ready(ctx context.Context, m cluster.Monitor) {}

func (f *blackholeFailer) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
func (f *blackholeFailer) Cleanup(ctx context.Context) {
f.c.Run(ctx, f.c.All(), `sudo iptables -F`)
}

func (f *blackholeFailer) Fail(ctx context.Context, nodeID int) {
// When dropping both input and output, we use multiport to block traffic both
// to port 26257 and from port 26257 on either side of the connection, to
// avoid any spurious packets from making it through.
Expand All @@ -677,21 +688,101 @@ func (f *blackholeFailer) Fail(ctx context.Context, t test.Test, c cluster.Clust
// input case we don't want inbound connections to work (INPUT to 26257), but
// we do want responses for outbound connections to work (INPUT from 26257).
if f.input && f.output {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
f.c.Run(ctx, f.c.Node(nodeID),
`sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
f.c.Run(ctx, f.c.Node(nodeID),
`sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
} else if f.input {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
} else if f.output {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
}
}

func (f *blackholeFailer) Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
func (f *blackholeFailer) Recover(ctx context.Context, nodeID int) {
f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -F`)
}

// crashFailer is a process crash where the TCP/IP stack remains responsive
// and sends immediate RST packets to peers.
type crashFailer struct {
t test.Test
c cluster.Cluster
m cluster.Monitor
startOpts option.StartOpts
startSettings install.ClusterSettings
}

func (f *crashFailer) Setup(ctx context.Context) {}
func (f *crashFailer) Ready(ctx context.Context, m cluster.Monitor) { f.m = m }
func (f *crashFailer) Cleanup(ctx context.Context) {}

func (f *crashFailer) Fail(ctx context.Context, nodeID int) {
f.m.ExpectDeath()
f.c.Stop(ctx, f.t.L(), option.DefaultStopOpts(), f.c.Node(nodeID)) // uses SIGKILL
}

func (f *crashFailer) Recover(ctx context.Context, nodeID int) {
f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID))
}

// diskStallFailer stalls the disk indefinitely. This should cause the node to
// eventually self-terminate, but we'd want leases to move off before then.
type diskStallFailer struct {
t test.Test
c cluster.Cluster
m cluster.Monitor
startOpts option.StartOpts
startSettings install.ClusterSettings
}

func (f *diskStallFailer) device() string {
switch f.c.Spec().Cloud {
case spec.GCE:
return "/dev/nvme0n1"
case spec.AWS:
return "/dev/nvme1n1"
default:
f.t.Fatalf("unsupported cloud %q", f.c.Spec().Cloud)
return ""
}
}

func (f *diskStallFailer) Setup(ctx context.Context) {
dev := f.device()
f.c.Run(ctx, f.c.All(), `sudo umount /mnt/data1`)
f.c.Run(ctx, f.c.All(), `sudo dmsetup remove_all`)
f.c.Run(ctx, f.c.All(), `echo "0 $(sudo blockdev --getsz `+dev+`) linear `+dev+` 0" | `+
`sudo dmsetup create data1`)
f.c.Run(ctx, f.c.All(), `sudo mount /dev/mapper/data1 /mnt/data1`)
}

func (f *diskStallFailer) Ready(ctx context.Context, m cluster.Monitor) {
f.m = m
}

func (f *diskStallFailer) Cleanup(ctx context.Context) {
f.c.Run(ctx, f.c.All(), `sudo dmsetup resume data1`)
// We have to stop the cluster to remount /mnt/data1.
f.m.ExpectDeaths(int32(f.c.Spec().NodeCount))
f.c.Stop(ctx, f.t.L(), option.DefaultStopOpts(), f.c.All())
f.c.Run(ctx, f.c.All(), `sudo umount /mnt/data1`)
f.c.Run(ctx, f.c.All(), `sudo dmsetup remove_all`)
f.c.Run(ctx, f.c.All(), `sudo mount /mnt/data1`)
}

func (f *diskStallFailer) Fail(ctx context.Context, nodeID int) {
// Pebble's disk stall detector should crash the node.
f.m.ExpectDeath()
f.c.Run(ctx, f.c.Node(nodeID), `sudo dmsetup suspend --noflush --nolockfs data1`)
}

func (f *blackholeFailer) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
func (f *diskStallFailer) Recover(ctx context.Context, nodeID int) {
f.c.Run(ctx, f.c.Node(nodeID), `sudo dmsetup resume data1`)
// Pebble's disk stall detector should have terminated the node, but in case
// it didn't, we explicitly stop it first.
f.c.Stop(ctx, f.t.L(), option.DefaultStopOpts(), f.c.Node(nodeID))
f.c.Start(ctx, f.t.L(), f.startOpts, f.startSettings, f.c.Node(nodeID))
}

// relocateRanges relocates all ranges matching the given predicate from a set
Expand Down

0 comments on commit 6f675b0

Please sign in to comment.