Skip to content

Commit

Permalink
metamorphic: use WAL failover
Browse files Browse the repository at this point in the history
Integrates WAL failover into the metamorphic tests as a random option with
random durations. Future work #2482 will expand the circumstances in which
we'll actually exercise failover by injecting artifical I/O latency.

Informs #3230.
  • Loading branch information
jbowens committed Mar 5, 2024
1 parent 5ab3420 commit f1528d7
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
13 changes: 13 additions & 0 deletions metamorphic/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,19 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts .
require.NoError(t, setupInitialState(dir, testOpts))
}

if testOpts.Opts.WALFailover != nil {
if runOpts.numInstances > 1 {
// TODO(bilal,jackson): Allow opts to diverge on a per-instance
// basis, and use that to set unique WAL dirs for all instances in
// multi-instance mode.
testOpts.Opts.WALFailover = nil
} else {
testOpts.Opts.WALFailover.Secondary.FS = opts.FS
testOpts.Opts.WALFailover.Secondary.Dirname = opts.FS.PathJoin(
runDir, testOpts.Opts.WALFailover.Secondary.Dirname)
}
}

if opts.WALDir != "" {
if runOpts.numInstances > 1 {
// TODO(bilal): Allow opts to diverge on a per-instance basis, and use
Expand Down
60 changes: 60 additions & 0 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package metamorphic
import (
"bytes"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -152,6 +153,11 @@ func parseOptions(
},
}
err := opts.Opts.Parse(data, hooks)
// Ensure that the WAL failover FS agrees with the primary FS. They're
// separate options, but in the metamorphic tests we keep them in sync.
if opts.Opts.WALFailover != nil {
opts.Opts.WALFailover.Secondary.FS = opts.Opts.FS
}
opts.InitRemoteStorageFactory()
opts.Opts.EnsureDefaults()
return err
Expand Down Expand Up @@ -567,6 +573,38 @@ func RandomOptions(
if rng.Intn(2) == 0 {
opts.WALDir = "data/wal"
}

// Half the time enable WAL failover.
// TODO(jackson): Add I/O latency injection (#2482). Until then WAL failover
// will rarely trigger.
if rng.Intn(2) == 0 {
// Use 10x longer durations when writing directly to FS; we don't want
// WAL failover to trigger excessively frequently.
referenceDur := time.Millisecond
if testOpts.useDisk {
referenceDur *= 10
}

scaleDuration := func(d time.Duration, minFactor, maxFactor float64) time.Duration {
return time.Duration(float64(d) * (minFactor + rng.Float64()*(maxFactor-minFactor)))
}
unhealthyThreshold := expRandDuration(rng, 3*referenceDur, time.Second)
healthyThreshold := expRandDuration(rng, 3*referenceDur, time.Second)
healthyInterval := scaleDuration(healthyThreshold, 1.0, 10.0) // Between 1-10x the healthy threshold
opts.WALFailover = &pebble.WALFailoverOptions{
Secondary: wal.Dir{FS: vfs.Default, Dirname: "data/wal_secondary"},
FailoverOptions: wal.FailoverOptions{
PrimaryDirProbeInterval: scaleDuration(healthyThreshold, 0.10, 0.50), // Between 10-50% of the healthy threshold
HealthyProbeLatencyThreshold: healthyThreshold,
HealthyInterval: healthyInterval,
UnhealthySamplingInterval: scaleDuration(unhealthyThreshold, 0.10, 0.50), // Between 10-50% of the unhealthy threshold
UnhealthyOperationLatencyThreshold: func() time.Duration {
return unhealthyThreshold
},
ElevatedWriteStallThresholdLag: expRandDuration(rng, 5*referenceDur, 2*time.Second),
},
}
}
if rng.Intn(4) == 0 {
// Enable Writer parallelism for 25% of the random options. Setting
// MaxWriterConcurrency to any value greater than or equal to 1 has the
Expand Down Expand Up @@ -637,6 +675,13 @@ func RandomOptions(
} else if !testOpts.useDisk {
opts.FS = vfs.NewMem()
}
// Update the WALFailover's secondary to use the same FS. This isn't
// strictly necessary (the WALFailover could use a separate FS), but it
// ensures when we save a copy of the test state to disk, we include the
// secondary's WALs.
if opts.WALFailover != nil {
opts.WALFailover.Secondary.FS = opts.FS
}
testOpts.ingestUsingApply = rng.Intn(2) != 0
testOpts.deleteSized = rng.Intn(2) != 0
testOpts.replaceSingleDelete = rng.Intn(2) != 0
Expand Down Expand Up @@ -699,6 +744,10 @@ func RandomOptions(
return testOpts
}

func expRandDuration(rng *rand.Rand, meanDur, maxDur time.Duration) time.Duration {
return min(maxDur, time.Duration(math.Round(rng.ExpFloat64()*float64(meanDur))))
}

func setupInitialState(dataDir string, testOpts *TestOptions) error {
// Copy (vfs.Default,<initialStatePath>/data) to (testOpts.opts.FS,<dataDir>).
ok, err := vfs.Clone(
Expand Down Expand Up @@ -737,6 +786,17 @@ func setupInitialState(dataDir string, testOpts *TestOptions) error {
FS: testOpts.Opts.FS,
Dirname: walRecoveryPath,
})

// If the failover dir exists and the test opts are not configured to use
// WAL failover, add the failover directory as a 'WAL recovery dir' in case
// the previous test was configured to use failover.
failoverDir := testOpts.Opts.FS.PathJoin(dataDir, "wal_secondary")
if _, err := testOpts.Opts.FS.Stat(failoverDir); err == nil && testOpts.Opts.WALFailover == nil {
testOpts.Opts.WALRecoveryDirs = append(testOpts.Opts.WALRecoveryDirs, wal.Dir{
FS: testOpts.Opts.FS,
Dirname: failoverDir,
})
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions metamorphic/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestOptionsRoundtrip(t *testing.T) {
"Experimental.IngestSplit:",
"Experimental.RemoteStorage:",
"Experimental.SingleDeleteInvariantViolationCallback:",
"WALFailover.FailoverOptions.UnhealthyOperationLatencyThreshold:",
// Floating points
"Experimental.PointTombstoneWeight:",
"Experimental.MultiLevelCompactionHeuristic.AddPropensity",
Expand Down

0 comments on commit f1528d7

Please sign in to comment.