Skip to content

Commit

Permalink
db: add TestDeterminism
Browse files Browse the repository at this point in the history
Add a new datadriven unit test intended to exercise potential races. The test
allows a datadriven test case to run a sequence of operations, recording the
sequence, and then re-run it with varying sources of nondeterminism, ensuring
that test output itself is deterministic.

Motivated by cockroachdb/cockroach#121263.
  • Loading branch information
jbowens committed Apr 1, 2024
1 parent d3f89bf commit a306d7e
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 0 deletions.
7 changes: 7 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,13 @@ func runCompactCmd(td *datadriven.TestData, d *DB) error {
func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
opts = opts.EnsureDefaults()
opts.FS = vfs.NewMem()
return runDBDefineCmdReuseFS(td, opts)
}

// runDBDefineCmdReuseFS is like runDBDefineCmd, but does not set opts.FS, expecting
// the caller to have set an appropriate FS already.
func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) {
opts = opts.EnsureDefaults()

var snapshots []uint64
var levelMaxBytes map[int]int64
Expand Down
237 changes: 237 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/cockroachdb/pebble/wal"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -2083,3 +2085,238 @@ func TestWALFailoverAvoidsWriteStall(t *testing.T) {
// Unblock the writes to allow the DB to close.
primaryFS.unblock()
}

// TestDeterminism is a datadriven test intended to validate determinism of
// operations in the face of concurrency or randomizing of operations. The test
// data defines a sequence of commands run sequentially. Then the test may
// re-run the sequence introducing latencies, reorderings, parallelism, etc,
// ensuring that all re-runs produce the same output.
func TestDeterminism(t *testing.T) {
var d *DB
var fs vfs.FS = vfs.NewMem()
defer func() {
if d != nil {
require.NoError(t, d.Close())
}
}()

type step struct {
fn func(td *datadriven.TestData) string
td datadriven.TestData
output string
}
var sequence []step
addStep := func(td *datadriven.TestData, fn func(td *datadriven.TestData) string) string {
s := strings.TrimSpace(fn(td))
sequence = append(sequence, step{
fn: fn,
td: *td,
output: s,
})
if len(s) > 0 {
s = s + "\n"
}
return s + fmt.Sprintf("%d:%s", len(sequence)-1, td.Cmd)
}

datadriven.RunTest(t, "testdata/determinism",
func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
fs = vfs.NewMem()
sequence = nil
return ""
case "define":
return addStep(td, func(td *datadriven.TestData) string {
opts := &Options{
FS: fs,
DebugCheck: DebugCheckLevels,
Logger: testLogger{t},
FormatMajorVersion: FormatNewest,
DisableAutomaticCompactions: true,
}
opts.Experimental.IngestSplit = func() bool { return rand.Intn(2) == 1 }
var err error
if d, err = runDBDefineCmdReuseFS(td, opts); err != nil {
return err.Error()
}
return d.mu.versions.currentVersion().String()
})
case "batch":
return addStep(td, func(td *datadriven.TestData) string {
b := d.NewBatch()
require.NoError(t, runBatchDefineCmd(td, b))
require.NoError(t, b.Commit(nil))
return ""
})
case "build":
return addStep(td, func(td *datadriven.TestData) string {
require.NoError(t, runBuildCmd(td, d, fs))
return ""
})
case "flush":
return addStep(td, func(td *datadriven.TestData) string {
_, err := d.AsyncFlush()
if err != nil {
return err.Error()
}
return ""
})
case "ingest-and-excise":
return addStep(td, func(td *datadriven.TestData) string {
if err := runIngestAndExciseCmd(td, d, fs); err != nil {
return err.Error()
}
return ""
})
case "maybe-compact":
return addStep(td, func(td *datadriven.TestData) string {
d.mu.Lock()
defer d.mu.Unlock()
d.opts.DisableAutomaticCompactions = false
d.maybeScheduleCompaction()
d.opts.DisableAutomaticCompactions = true
return ""
})
case "run":
var mkfs func() vfs.FS = func() vfs.FS { return vfs.NewMem() }
var beforeStep func()
for _, cmdArg := range td.CmdArgs {
switch cmdArg.Key {
case "io-latency", "step-latency":
p, err := strconv.ParseFloat(cmdArg.Vals[0], 64)
require.NoError(t, err)
mean, err := time.ParseDuration(cmdArg.Vals[1])
require.NoError(t, err)
if cmdArg.Key == "io-latency" {
prevMkfs := mkfs
mkfs = func() vfs.FS {
return errorfs.Wrap(prevMkfs(), errorfs.RandomLatency(errorfs.Randomly(p, 0), mean, 0))
}
} else if cmdArg.Key == "step-latency" {
beforeStep = func() {
if rand.Float64() < p {
time.Sleep(time.Duration(min(rand.ExpFloat64(), 20.0) * float64(mean)))
}
}
}
}
}
ordering := parseOrdering(td.Input)

var sb strings.Builder
rerunSequence := func() string {
sb.Reset()
fs = mkfs()
output := make([]string, len(sequence))
ordering.visit(func(i int) {
beforeStep()
output[i] = strings.TrimSpace(sequence[i].fn(&sequence[i].td))
})
for i := range output {
if output[i] != sequence[i].output {
fmt.Fprintf(&sb, "# step %d: %s\n", i, sequence[i].td.Cmd)
fmt.Fprintf(&sb, "expected:\n%s\ngot:\n%s", sequence[i].output, output[i])
fmt.Fprintln(&sb)
}
}
return sb.String()
}
retries := 10
td.MaybeScanArgs(t, "count", &retries)
for i := 0; i < retries; i++ {
if diff := rerunSequence(); len(diff) > 0 {
return diff
}
}
return "ok"
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

type orderingNode interface {
visit(func(int))
}

type sequential []orderingNode

func (s sequential) visit(fn func(int)) {
for _, n := range s {
n.visit(fn)
}
}

type reorder []orderingNode

func (r reorder) visit(fn func(int)) {
for _, i := range rand.Perm(len(r)) {
r[i].visit(fn)
}
}

type parallel []orderingNode

func (p parallel) visit(fn func(int)) {
var wg sync.WaitGroup
wg.Add(len(p))
for i := range p {
go func(i int) {
defer wg.Done()
p[i].visit(fn)
}(i)
}
wg.Wait()
}

type leaf int

func (l leaf) visit(fn func(int)) { fn(int(l)) }

func parseOrdering(s string) orderingNode {
n, _ := parseOrderingTokens(strings.Fields(s))
return n
}

func parseOrderingTokens(tokens []string) (orderingNode, int) {
if len(tokens) == 0 {
return nil, 0
}
switch tokens[0] {
case ")":
panic("unexpected )")
case "sequential(", "reorder(", "parallel(":
var nodes []orderingNode
i := 1
for i < len(tokens) {
if tokens[i] == ")" {
i++
break
}
n, m := parseOrderingTokens(tokens[i:])
nodes = append(nodes, n)
i += m
}
switch tokens[0] {
case "sequential(":
return sequential(nodes), i
case "reorder(":
return reorder(nodes), i
case "parallel(":
return parallel(nodes), i
default:
panic("unreachable")
}
default:
n := strings.IndexByte(tokens[0], ':')
if n == -1 {
n = len(tokens[0])
}
v, err := strconv.Atoi(tokens[0][:n])
if err != nil {
panic(err)
}
return leaf(v), 1
}
}
78 changes: 78 additions & 0 deletions testdata/determinism
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# A simple ingest-and-excise test that ensures invariants are respected in the
# presence of overlapping writes to the memtable, flushes, and compactions. The
# individual commands don't assert much themselves, depending on Pebble's
# invariant violations to trigger if a sequence violates invariants.

define
L0
a.SET.3:v
apple.SET.3:v
b.SET.3:v
L0
a.SET.2:v
avocado.SET.3:v
----
L0.1:
000004:[a#3,SET-b#3,SET]
L0.0:
000005:[a#2,SET-avocado#3,SET]
0:define

build ext-ab
set a a
set anchovy anchovy
del-range a b
range-key-del a b
----
1:build

build ext-bc
set b b
set banana banana
del-range b c
range-key-del b c
----
2:build

batch
set apple 4
----
3:batch

batch
set banana 5
----
4:batch

flush
----
5:flush

maybe-compact
----
6:maybe-compact

ingest-and-excise contains-excise-tombstone excise=a-b ext-ab
----
7:ingest-and-excise

ingest-and-excise contains-excise-tombstone excise=b-c ext-bc
----
8:ingest-and-excise

# Re-run the same sequence of operations 10 times, injecting random latency between
# some steps and within some IO operations. Randomize the ordering of the last four
# steps.

run io-latency=(.1,100µs) step-latency=(.2,5ms) count=10
sequential( 0:define 1:build 2:build reorder( 3:batch 4:batch 5:flush 6:maybe-compact 7:ingest-and-excise 8:ingest-and-excise ) )
----
ok

# Re-run the same sequence of operations 10 times, injecting random latency between
# some steps and within some IO operations. Run the last four steps in parallel.

run io-latency=(.1,100µs) step-latency=(.2,5ms) count=10
sequential( 0:define 1:build 2:build parallel( 3:batch 4:batch 5:flush 6:maybe-compact 7:ingest-and-excise 8:ingest-and-excise ) )
----
ok

0 comments on commit a306d7e

Please sign in to comment.