diff --git a/internal/testkeys/testkeys_test.go b/internal/testkeys/testkeys_test.go index 2312088c89..7573eeb75f 100644 --- a/internal/testkeys/testkeys_test.go +++ b/internal/testkeys/testkeys_test.go @@ -65,6 +65,10 @@ func TestKeyCount(t *testing.T) { } testCases := map[params]int64{ {26, 1}: 26, + {26, 2}: 702, + {26, 3}: 18278, + {26, 4}: 475254, + {26, 5}: 12356630, {52, 1}: 52, {2, 2}: 6, {2, 3}: 14, diff --git a/iterator_test.go b/iterator_test.go index 740a0143b6..23765cb1e9 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -15,6 +15,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" + "golang.org/x/sync/errgroup" ) var testKeyValuePairs = []string{ @@ -2712,6 +2714,180 @@ func BenchmarkSeekPrefixTombstones(b *testing.B) { } } +func waitForCompactionsAndTableStats(d *DB) { + d.mu.Lock() + // NB: Wait for table stats because some compaction types rely + // on table stats to be collected. + d.waitTableStats() + for d.mu.compact.compactingCount > 0 { + d.mu.compact.cond.Wait() + d.waitTableStats() + } + d.mu.Unlock() +} + +// BenchmarkPointDeletedSwath benchmarks iterator operations on large-ish +// (hundreds of MBs) databases containing broad swaths of keys removed by point +// tombstones. +func BenchmarkPointDeletedSwath(b *testing.B) { + const maxKeyLen = 5 + ks := testkeys.Alpha(maxKeyLen) + + opts := func() *Options { + return (&Options{ + FS: vfs.NewMem(), + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatNewest, + }).EnsureDefaults() + } + type iteratorOp struct { + name string + fn func(*Iterator, testkeys.Keyspace, *rand.Rand) + } + var iterKeyBuf [maxKeyLen]byte + + iterOps := []iteratorOp{ + { + name: "seek-prefix-ge", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) { + n := testkeys.WriteKey(iterKeyBuf[:], ks, int64(rng.Intn(int(ks.Count())))) + _ = iter.SeekPrefixGE(iterKeyBuf[:n]) + }, + }, + { + name: "seek-ge", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) { + n := testkeys.WriteKey(iterKeyBuf[:], ks, int64(rng.Intn(int(ks.Count())))) + _ = iter.SeekGE(iterKeyBuf[:n]) + }, + }, + { + name: "iterate", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) { + valid := iter.Next() + if !valid { + iter.First() + } + }, + }, + } + + // Populate an initial database with point keys at every key in the `ks` + // keyspace. + populated := withStateSetup(b, vfs.NewMem(), opts(), populateKeyspaceSetup(ks)) + for _, gapLength := range []int{100, 1_000, 10_000, 100_000, 200_000, 400_000, 1_000_000, 2_000_000, 5_000_000, 10_000_000} { + b.Run(fmt.Sprintf("gap=%d", gapLength), func(b *testing.B) { + // Extend the `populated` initial database with DELs deleting all + // the middle keys in the keyspace in a contiguous swath of + // `gapLength` keys. + gapDeleted := withStateSetup(b, populated, opts(), deleteGapSetup(ks, gapLength)) + + for _, op := range iterOps { + b.Run(op.name, func(b *testing.B) { + // Run each instance of the test in a fresh DB constructed + // from `compacted`. This ensures background compactions + // from one iterator operation don't affect another iterator + // option. + withStateSetup(b, gapDeleted, opts(), func(_ testing.TB, d *DB) { + rng := rand.New(rand.NewSource(1 /* fixed seed */)) + iter, err := d.NewIter(nil) + require.NoError(b, err) + b.ResetTimer() + for i := 0; i < b.N; i++ { + op.fn(iter, ks, rng) + } + b.StopTimer() + require.NoError(b, iter.Close()) + }) + }) + } + }) + } +} + +func withStateSetup( + t testing.TB, initial vfs.FS, opts *Options, setup func(testing.TB, *DB), +) vfs.FS { + ok, err := vfs.Clone(initial, opts.FS, "", "", vfs.CloneSync) + require.NoError(t, err) + require.True(t, ok) + d, err := Open("", opts) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + setup(t, d) + return opts.FS +} + +func populateKeyspaceSetup(ks testkeys.Keyspace) func(testing.TB, *DB) { + const valSize = 256 + return func(t testing.TB, d *DB) { + t.Logf("Populating keyspace with %d keys, each with %d-byte values", ks.Count(), valSize) + // Parallelize population by divvying up the keyspace. + var grp errgroup.Group + loadKeyspaces := testkeys.Divvy(ks, 20) + var progress atomic.Uint64 + for l := 0; l < len(loadKeyspaces); l++ { + l := l + grp.Go(func() error { + rng := rand.New(rand.NewSource(1)) + batch := d.NewBatch() + key := make([]byte, ks.MaxLen()) + var val [valSize]byte + for i := int64(0); i < loadKeyspaces[l].Count(); i++ { + rng.Read(val[:]) + n := testkeys.WriteKey(key, loadKeyspaces[l], i) + if err := batch.Set(key[:n], val[:], nil); err != nil { + return err + } + if batch.Len() >= 10<<10 /* 10 kib */ { + count := batch.Count() + require.NoError(t, batch.Commit(NoSync)) + if newTotal := progress.Add(uint64(count)); (newTotal / (uint64(ks.Count()) / 100)) != (newTotal-uint64(count))/uint64(ks.Count()/100) { + t.Logf("%.1f%% populated", 100.0*(float64(newTotal)/float64(ks.Count()))) + } + batch = d.NewBatch() + d.AsyncFlush() + } + } + if !batch.Empty() { + return batch.Commit(NoSync) + } + return nil + }) + } + require.NoError(t, grp.Wait()) + } +} + +func deleteGapSetup(ks testkeys.Keyspace, gapLength int) func(testing.TB, *DB) { + return func(t testing.TB, d *DB) { + midpoint := ks.Count() / 2 + gapStart := midpoint - int64(gapLength/2) + gapEnd := midpoint + int64(gapLength/2+(gapLength%2)) + + batch := d.NewBatch() + key := make([]byte, ks.MaxLen()) + for i := gapStart; i <= gapEnd; i++ { + n := testkeys.WriteKey(key, ks, i) + if err := batch.Delete(key[:n], nil); err != nil { + t.Fatal(err) + } + if batch.Len() >= 10<<10 /* 10 kib */ { + if err := batch.Commit(NoSync); err != nil { + t.Fatal(err) + } + batch = d.NewBatch() + } + } + if !batch.Empty() { + if err := batch.Commit(NoSync); err != nil { + t.Fatal(err) + } + } + if err := d.Flush(); err != nil { + t.Fatal(err) + } + waitForCompactionsAndTableStats(d) + } +} + func runBenchmarkQueueWorkload(b *testing.B, deleteRatio float32, initOps int, valueSize int) { const queueCount = 8 // These should be large enough to assign a unique key to each item in the @@ -2792,19 +2968,7 @@ func runBenchmarkQueueWorkload(b *testing.B, deleteRatio float32, initOps int, v _, err = d.AsyncFlush() require.NoError(b, err) - waitForCompactions := func() { - d.mu.Lock() - // NB: Wait for table stats because some compaction types rely - // on table stats to be collected. - d.waitTableStats() - for d.mu.compact.compactingCount > 0 { - d.mu.compact.cond.Wait() - d.waitTableStats() - } - d.mu.Unlock() - } - - waitForCompactions() + waitForCompactionsAndTableStats(d) // Log the number of tombstones and live keys in each level after // background compactions are complete.