Skip to content

Commit

Permalink
db: Add BenchmarkPointDeletedSwath
Browse files Browse the repository at this point in the history
Adds a new benchmark that measures the performance
of seeks and full-table scans after a swath of point
tombstones are created in the middle of the keyspace.

This is more or less a copy of #2657 with
a few fixes from the comments there. Initial results
can be found at [this comment](#918 (comment)).
  • Loading branch information
anish-shanbhag committed Jul 30, 2024
1 parent e326a01 commit 9cbad0b
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 13 deletions.
4 changes: 4 additions & 0 deletions internal/testkeys/testkeys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func TestKeyCount(t *testing.T) {
}
testCases := map[params]int64{
{26, 1}: 26,
{26, 2}: 702,
{26, 3}: 18278,
{26, 4}: 475254,
{26, 5}: 12356630,
{52, 1}: 52,
{2, 2}: 6,
{2, 3}: 14,
Expand Down
190 changes: 177 additions & 13 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"
)

var testKeyValuePairs = []string{
Expand Down Expand Up @@ -2712,6 +2714,180 @@ func BenchmarkSeekPrefixTombstones(b *testing.B) {
}
}

func waitForCompactionsAndTableStats(d *DB) {
d.mu.Lock()
// NB: Wait for table stats because some compaction types rely
// on table stats to be collected.
d.waitTableStats()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
d.waitTableStats()
}
d.mu.Unlock()
}

// BenchmarkPointDeletedSwath benchmarks iterator operations on large-ish
// (hundreds of MBs) databases containing broad swaths of keys removed by point
// tombstones.
func BenchmarkPointDeletedSwath(b *testing.B) {
const maxKeyLen = 5
ks := testkeys.Alpha(maxKeyLen)

opts := func() *Options {
return (&Options{
FS: vfs.NewMem(),
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatNewest,
}).EnsureDefaults()
}
type iteratorOp struct {
name string
fn func(*Iterator, testkeys.Keyspace, *rand.Rand)
}
var iterKeyBuf [maxKeyLen]byte

iterOps := []iteratorOp{
{
name: "seek-prefix-ge", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) {
n := testkeys.WriteKey(iterKeyBuf[:], ks, int64(rng.Intn(int(ks.Count()))))
_ = iter.SeekPrefixGE(iterKeyBuf[:n])
},
},
{
name: "seek-ge", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) {
n := testkeys.WriteKey(iterKeyBuf[:], ks, int64(rng.Intn(int(ks.Count()))))
_ = iter.SeekGE(iterKeyBuf[:n])
},
},
{
name: "iterate", fn: func(iter *Iterator, ks testkeys.Keyspace, rng *rand.Rand) {
valid := iter.Next()
if !valid {
iter.First()
}
},
},
}

// Populate an initial database with point keys at every key in the `ks`
// keyspace.
populated := withStateSetup(b, vfs.NewMem(), opts(), populateKeyspaceSetup(ks))
for _, gapLength := range []int{100, 1_000, 10_000, 100_000, 200_000, 400_000, 1_000_000, 2_000_000, 5_000_000, 10_000_000} {
b.Run(fmt.Sprintf("gap=%d", gapLength), func(b *testing.B) {
// Extend the `populated` initial database with DELs deleting all
// the middle keys in the keyspace in a contiguous swath of
// `gapLength` keys.
gapDeleted := withStateSetup(b, populated, opts(), deleteGapSetup(ks, gapLength))

for _, op := range iterOps {
b.Run(op.name, func(b *testing.B) {
// Run each instance of the test in a fresh DB constructed
// from `compacted`. This ensures background compactions
// from one iterator operation don't affect another iterator
// option.
withStateSetup(b, gapDeleted, opts(), func(_ testing.TB, d *DB) {
rng := rand.New(rand.NewSource(1 /* fixed seed */))
iter, err := d.NewIter(nil)
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
op.fn(iter, ks, rng)
}
b.StopTimer()
require.NoError(b, iter.Close())
})
})
}
})
}
}

func withStateSetup(
t testing.TB, initial vfs.FS, opts *Options, setup func(testing.TB, *DB),
) vfs.FS {
ok, err := vfs.Clone(initial, opts.FS, "", "", vfs.CloneSync)
require.NoError(t, err)
require.True(t, ok)
d, err := Open("", opts)
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()
setup(t, d)
return opts.FS
}

func populateKeyspaceSetup(ks testkeys.Keyspace) func(testing.TB, *DB) {
const valSize = 256
return func(t testing.TB, d *DB) {
t.Logf("Populating keyspace with %d keys, each with %d-byte values", ks.Count(), valSize)
// Parallelize population by divvying up the keyspace.
var grp errgroup.Group
loadKeyspaces := testkeys.Divvy(ks, 20)
var progress atomic.Uint64
for l := 0; l < len(loadKeyspaces); l++ {
l := l
grp.Go(func() error {
rng := rand.New(rand.NewSource(1))
batch := d.NewBatch()
key := make([]byte, ks.MaxLen())
var val [valSize]byte
for i := int64(0); i < loadKeyspaces[l].Count(); i++ {
rng.Read(val[:])
n := testkeys.WriteKey(key, loadKeyspaces[l], i)
if err := batch.Set(key[:n], val[:], nil); err != nil {
return err
}
if batch.Len() >= 10<<10 /* 10 kib */ {
count := batch.Count()
require.NoError(t, batch.Commit(NoSync))
if newTotal := progress.Add(uint64(count)); (newTotal / (uint64(ks.Count()) / 100)) != (newTotal-uint64(count))/uint64(ks.Count()/100) {
t.Logf("%.1f%% populated", 100.0*(float64(newTotal)/float64(ks.Count())))
}
batch = d.NewBatch()
d.AsyncFlush()
}
}
if !batch.Empty() {
return batch.Commit(NoSync)
}
return nil
})
}
require.NoError(t, grp.Wait())
}
}

func deleteGapSetup(ks testkeys.Keyspace, gapLength int) func(testing.TB, *DB) {
return func(t testing.TB, d *DB) {
midpoint := ks.Count() / 2
gapStart := midpoint - int64(gapLength/2)
gapEnd := midpoint + int64(gapLength/2+(gapLength%2))

batch := d.NewBatch()
key := make([]byte, ks.MaxLen())
for i := gapStart; i <= gapEnd; i++ {
n := testkeys.WriteKey(key, ks, i)
if err := batch.Delete(key[:n], nil); err != nil {
t.Fatal(err)
}
if batch.Len() >= 10<<10 /* 10 kib */ {
if err := batch.Commit(NoSync); err != nil {
t.Fatal(err)
}
batch = d.NewBatch()
}
}
if !batch.Empty() {
if err := batch.Commit(NoSync); err != nil {
t.Fatal(err)
}
}
if err := d.Flush(); err != nil {
t.Fatal(err)
}
waitForCompactionsAndTableStats(d)
}
}

func runBenchmarkQueueWorkload(b *testing.B, deleteRatio float32, initOps int, valueSize int) {
const queueCount = 8
// These should be large enough to assign a unique key to each item in the
Expand Down Expand Up @@ -2792,19 +2968,7 @@ func runBenchmarkQueueWorkload(b *testing.B, deleteRatio float32, initOps int, v
_, err = d.AsyncFlush()
require.NoError(b, err)

waitForCompactions := func() {
d.mu.Lock()
// NB: Wait for table stats because some compaction types rely
// on table stats to be collected.
d.waitTableStats()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
d.waitTableStats()
}
d.mu.Unlock()
}

waitForCompactions()
waitForCompactionsAndTableStats(d)

// Log the number of tombstones and live keys in each level after
// background compactions are complete.
Expand Down

0 comments on commit 9cbad0b

Please sign in to comment.