Skip to content

Commit

Permalink
roachtest: add disk-stalled/wal-failover/among-stores test
Browse files Browse the repository at this point in the history
Introduce a new roachtest that simulates disk stalls on one store of a 3-node
cluster with two stores per node, and the --wal-failover=among-stores
configuration set. The WAL failover configuration should ensure the workload
continues uninterrupted until it becomes blocked on disk reads.

Informs #119418.
Informs cockroachdb/pebble#3230
Epic: CRDB-35401
  • Loading branch information
jbowens committed Mar 25, 2024
1 parent 00a6257 commit 688d6ab
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 2 deletions.
161 changes: 159 additions & 2 deletions pkg/cmd/roachtest/tests/disk_stall.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,147 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

const maxSyncDur = 10 * time.Second
// registerDiskStalledWALFailover registers the disk stall WAL failover tests.
// These tests assert that a storage engine configured with WAL failover
// survives a temporary disk stall through failing over to a secondary disk.
func registerDiskStalledWALFailover(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "disk-stalled/wal-failover/among-stores",
Owner: registry.OwnerStorage,
Cluster: r.MakeClusterSpec(4, spec.CPU(16), spec.ReuseNone(), spec.SSD(2)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Timeout: 3 * time.Hour,
SkipPostValidations: registry.PostValidationNoDeadNodes,
// Encryption is implemented within the virtual filesystem layer,
// just like disk-health monitoring. It's important to exercise
// encryption-at-rest to ensure there is not unmonitored I/O within
// the encryption-at-rest implementation that could indefinitely
// stall the process during a disk stall.
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runDiskStalledWALFailover(ctx, t, c, "among-stores")
},
})
}

func runDiskStalledWALFailover(
ctx context.Context, t test.Test, c cluster.Cluster, failoverFlag string,
) {
startSettings := install.MakeClusterSettings()
// Set a high value for the max sync durations to avoid the disk
// stall detector fataling the node.
const maxSyncDur = 60 * time.Second
startSettings.Env = append(startSettings.Env,
"COCKROACH_AUTO_BALLAST=false",
fmt.Sprintf("COCKROACH_LOG_MAX_SYNC_DURATION=%s", maxSyncDur),
fmt.Sprintf("COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=%s", maxSyncDur))

t.Status("setting up disk staller")
s := &dmsetupDiskStaller{t: t, c: c, logsToo: true}
s.Setup(ctx)
defer s.Cleanup(ctx)

t.Status("starting cluster")
startOpts := option.DefaultStartOpts()
if failoverFlag == "among-stores" {
startOpts.RoachprodOpts.StoreCount = 2
}
startOpts.RoachprodOpts.ExtraArgs = []string{
// Adopt buffering of the file logging to ensure that we don't block on
// flushing logs to the stalled device.
"--log", fmt.Sprintf(`{sinks: {stderr: {filter: INFO}}, file-defaults: {dir: "%s", buffered-writes: false, buffering: {max-staleness: 1s, flush-trigger-size: 256KiB, max-buffer-size: 50MiB}}}`, s.LogDir()),
"--wal-failover=" + failoverFlag,
}
c.Start(ctx, t.L(), startOpts, startSettings, c.Range(1, 3))

// Open a SQL connection to n1, the node that will be stalled.
n1Conn := c.Conn(ctx, t.L(), 1)
defer n1Conn.Close()
require.NoError(t, n1Conn.PingContext(ctx))
// Wait for upreplication.
require.NoError(t, WaitFor3XReplication(ctx, t, t.L(), n1Conn))
c.Run(ctx, option.WithNodes(c.Node(4)), `./cockroach workload init kv --splits 1000 {pgurl:1}`)
_, err := n1Conn.ExecContext(ctx, `USE kv;`)
require.NoError(t, err)

t.Status("starting workload")
workloadStartAt := timeutil.Now()
m := c.NewMonitor(ctx, c.Range(1, 3))
m.Go(func(ctx context.Context) error {
c.Run(ctx, option.WithNodes(c.Node(4)), `./cockroach workload run kv --read-percent 0 `+
`--duration 60m --concurrency 4096 --max-rate 4096 --tolerate-errors `+
` --min-block-bytes=2048 --max-block-bytes=2048 --timeout 1s `+
`{pgurl:1-3}`)
return nil
})
defer m.Wait()

const pauseBetweenStalls = 10 * time.Minute
t.Status("pausing ", pauseBetweenStalls, " before simulated disk stall on n1")
ticker := time.NewTicker(time.Second)
nextStallAt := workloadStartAt.Add(pauseBetweenStalls)
defer ticker.Stop()

progressEvery := log.Every(time.Minute)
for timeutil.Since(workloadStartAt) < time.Hour+5*time.Minute {
select {
case <-ctx.Done():
t.Fatalf("context done before finished workload: %s", ctx.Err())
case now := <-ticker.C:
if now.Before(nextStallAt) {
if progressEvery.ShouldLog() {
t.Status("pausing ", nextStallAt.Sub(now), " before next simulated disk stall on n1")
}
continue
}
func() {
s.Stall(ctx, c.Node(1))
// NB: We use a background context in the defer'ed unstall command,
// otherwise on test failure our Unstall calls will be ignored. Leaving
// the disk stalled will prevent artifact collection, making debugging
// difficult.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
s.Unstall(ctx, c.Node(1))
}()

select {
case <-ctx.Done():
t.Fatalf("context done while stall induced: %s", ctx.Err())
case <-time.After(30 * time.Second):
// Return from the anonymous function, allowing the
// defer to unstall the node.
return
}
}()
nextStallAt = now.Add(pauseBetweenStalls)
}
}
t.Status("exited stall loop")

time.Sleep(1 * time.Second)
exit, ok := getProcessExitMonotonic(ctx, t, c, 1)
if ok && exit > 0 {
t.Fatal("process exited unexectedly")
}

// TODO(jackson): We could query timeseries to verify that s1 did failover to
// its WAL secondary during the test.

// TODO(jackson): We could query timeseries to perform some sort of assertion
// on the quality of service provided.

// Shut down the nodes, allowing any devices to be unmounted during cleanup.
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Range(1, 3))
}

// registerDiskStalledDetection registers the disk stall detection tests. These
// tests assert that a disk stall is detected and the process crashes
Expand Down Expand Up @@ -78,6 +214,8 @@ func registerDiskStalledDetection(r registry.Registry) {
func runDiskStalledDetection(
ctx context.Context, t test.Test, c cluster.Cluster, s diskStaller, doStall bool,
) {
const maxSyncDur = 10 * time.Second

startOpts := option.DefaultStartOpts()
startOpts.RoachprodOpts.ExtraArgs = []string{
"--store", s.DataDir(),
Expand Down Expand Up @@ -292,6 +430,9 @@ type diskStaller interface {
type dmsetupDiskStaller struct {
t test.Test
c cluster.Cluster
// If logsToo=true the logs directory will be updated to be a symlink
// pointing into the store directory.
logsToo bool
}

var _ diskStaller = (*dmsetupDiskStaller)(nil)
Expand All @@ -315,6 +456,11 @@ func (s *dmsetupDiskStaller) Setup(ctx context.Context) {
s.t.Fatal(err)
}
s.c.Run(ctx, option.WithNodes(s.c.All()), `sudo mount /dev/mapper/data1 /mnt/data1`)

if s.logsToo {
s.c.Run(ctx, option.WithNodes(s.c.All()), "mkdir -p {store-dir}/logs")
s.c.Run(ctx, option.WithNodes(s.c.All()), "rm -f logs && ln -s {store-dir}/logs logs || true")
}
}

func (s *dmsetupDiskStaller) Cleanup(ctx context.Context) {
Expand Down Expand Up @@ -442,9 +588,20 @@ func (s *cgroupDiskStaller) setThroughput(
}

func getDevice(t test.Test, c cluster.Cluster) string {
s := c.Spec()
switch c.Cloud() {
case spec.GCE:
return "/dev/sdb"
switch s.LocalSSD {
case spec.LocalSSDDisable:
return "/dev/sdb"
case spec.LocalSSDPreferOn, spec.LocalSSDDefault:
// TODO(jackson): These spec values don't guarantee that we are actually
// using local SSDs, just that we might've.
return "/dev/nvme0n1"
default:
t.Fatalf("unsupported LocalSSD enum %v", s.LocalSSD)
return ""
}
case spec.AWS:
return "/dev/nvme1n1"
default:
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterTests(r registry.Registry) {
registerDisaggRebalance(r)
registerDiskFull(r)
registerDiskStalledDetection(r)
registerDiskStalledWALFailover(r)
registerDjango(r)
registerDrain(r)
registerDrop(r)
Expand Down

0 comments on commit 688d6ab

Please sign in to comment.