Skip to content

Commit

Permalink
db: add TestWALFailoverRandomized
Browse files Browse the repository at this point in the history
Add a randomized test of WAL failover and recovery of a DB from failover WAL
logs.

Close cockroachdb#3865.
  • Loading branch information
jbowens committed Aug 27, 2024
1 parent 7b047cd commit 7c3b966
Showing 1 changed file with 170 additions and 0 deletions.
170 changes: 170 additions & 0 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package pebble
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"math/rand"
"os"
"path/filepath"
"reflect"
Expand All @@ -18,12 +21,15 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/metamorphic"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/manifest"
Expand Down Expand Up @@ -1525,3 +1531,167 @@ func TestMkdirAllAndSyncParents(t *testing.T) {
}
})
}

// TestWALFailoverRandomized is a randomzied test exercising recovery in the
// presence of WAL failover. It repeatedly opens a database, writes a number of
// batches concurrently and simulates a hard crash using vfs.NewCrashableMem. It
// ensures that the resulting DB state opens successfully, and the contents of
// the DB match the expectations based on the keys written.
//
// This test is partially a regression test for #3865.
func TestWALFailoverRandomized(t *testing.T) {
seed := time.Now().UnixNano()
t.Logf("seed %d", seed)
mem := vfs.NewCrashableMem()
makeOptions := func(mem *vfs.MemFS) *Options {
failoverOpts := WALFailoverOptions{
Secondary: wal.Dir{FS: mem, Dirname: "secondary"},
FailoverOptions: wal.FailoverOptions{
PrimaryDirProbeInterval: time.Microsecond,
HealthyProbeLatencyThreshold: 20 * time.Microsecond,
HealthyInterval: 10 * time.Microsecond,
UnhealthySamplingInterval: time.Microsecond,
UnhealthyOperationLatencyThreshold: func() (time.Duration, bool) {
return 10 * time.Microsecond, true
},
ElevatedWriteStallThresholdLag: 50 * time.Microsecond,
},
}

mean := time.Duration(rand.ExpFloat64() * float64(time.Microsecond))
p := rand.Float64()
t.Logf("Injecting mean %s of latency with p=%.3f", mean, p)
fs := errorfs.Wrap(mem, errorfs.RandomLatency(errorfs.Randomly(p, seed), mean, seed, time.Second))
return &Options{
FS: fs,
FormatMajorVersion: internalFormatNewest,
Logger: testLogger{t},
MemTableSize: 128 << 10, // 128 KiB
MemTableStopWritesThreshold: 4,
WALFailover: &failoverOpts,
}
}

// KV state tracking.
//
// This test uses all uint16 big-endian integers as a keyspace. Values are
// randomly sized but always contain the key in the first two bytes. We
// track the state of all KVs throughout the test (whether they're
// definitely set, maybe set or definitely unset).
//
// Note that the test may wrap around to the beginning of the keyspace. This
// may cause KVs left at kvMaybeSet to be written and be definitively set
// the second time around.
type kvState int8
const (
kvUnset kvState = 0
kvMaybeSet kvState = 1
kvSet kvState = 2
)
const keyspaceSize = math.MaxUint16 + 1
var kvs struct {
sync.Mutex
states [keyspaceSize]kvState
count uint64 // [0, math.MaxUint16]; INVARIANT: states[count:] all zeroes
crashing bool
}
setIsCrashing := func(crashing bool) {
kvs.Lock()
defer kvs.Unlock()
kvs.crashing = crashing
}
// transitionState is called by goroutines responsible for committing
// batches to the engine. Note that 'i' is the index of the KV before
// wrapping around and needs to be modded by math.MaxUint16.
transitionState := func(i, count uint64, state kvState) {
kvs.Lock()
defer kvs.Unlock()
if kvs.crashing && state == kvSet {
// We're racing with a CrashClone call and it's indeterminate
// whether what we think we synced actually made the cut. Leave the
// kvs at the kvMaybeSet.
state = kvMaybeSet
}
for j := uint64(0); j < count; j++ {
idx := (i + j) % keyspaceSize
kvs.states[idx] = max(kvs.states[idx], state)
}
kvs.count = max(kvs.count, i+count, math.MaxUint16)
}
// validateState is called on recovery to ensure that engine state agrees
// with the tracked KV state.
validateState := func(d *DB) {
it, err := d.NewIter(nil)
require.NoError(t, err)
valid := it.First()
for i := 0; i < int(kvs.count); i++ {
var kvIsSet bool
if valid {
require.Len(t, it.Key(), 2)
require.Equal(t, it.Key(), it.Value()[:2])
kvIsSet = binary.BigEndian.Uint16(it.Key()) == uint16(i)
}
if kvIsSet && kvs.states[i] == kvUnset {
t.Fatalf("key %04x is set; state says it should be unset", i)
} else if !kvIsSet && kvs.states[i] == kvSet {
t.Fatalf("key %04x is unset; state says it should be set", i)
}
if kvIsSet {
valid = it.Next()
}
}
require.NoError(t, it.Close())
}

d, err := Open("primary", makeOptions(mem))
require.NoError(t, err)
rng := rand.New(rand.NewSource(seed))
var wg sync.WaitGroup
var n uint64
randomOps := metamorphic.Weighted[func()]{
{Weight: 1, Item: func() {
time.Sleep(time.Microsecond * time.Duration(rand.Intn(30)))
t.Log("initiating hard crash")
setIsCrashing(true)
// Take a strict clone of the filesystem and use that going forward.
mem = mem.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 50, RNG: rng})
wg.Wait() // Wait for outstanding batch commits to finish.
_ = d.Close()
d, err = Open("primary", makeOptions(mem))
require.NoError(t, err)
validateState(d)
setIsCrashing(false)
}},
{Weight: 20, Item: func() {
count := rng.Intn(14) + 1
var k [2]byte
var v [4096]byte
b := d.NewBatch()
for i := 0; i < count; i++ {
j := uint16((n + uint64(i)) % keyspaceSize)
binary.BigEndian.PutUint16(k[:], j)
vn := max(rng.Intn(cap(v)), 2)
binary.BigEndian.PutUint16(v[:], j)
require.NoError(t, b.Set(k[:], v[:vn], nil))
}
maybeSync := NoSync
if rng.Intn(2) == 1 {
maybeSync = Sync
}
wg.Add(1)
go func(n, count uint64) {
defer wg.Done()
transitionState(n, count, kvMaybeSet)
require.NoError(t, b.Commit(maybeSync))
if maybeSync == Sync {
transitionState(n, count, kvSet)
}
}(n, uint64(count))
n += uint64(count)
}},
}
nextRandomOp := randomOps.RandomDeck(rng)
for o := 0; o < 1000; o++ {
nextRandomOp()()
}
}

0 comments on commit 7c3b966

Please sign in to comment.