Skip to content

Commit

Permalink
db: fix bug in interaction between Clone and indexed batches
Browse files Browse the repository at this point in the history
In e32e94d the semantics of Iterators reading through indexed batches were
updated. The semantics of a Clone'd Iterator reading through an indexed batch
were updated so that the cloned Iterator observes exactly the same batch state
as the Iterator being cloned. There was a bug in this implementation where a
cloned iterator would use the cached tombstone/range key fragments on the
batch, if available, and the cache would contain more recent keys.

This commit updates the range deletion/range key fragment caching to only use
and update the cache of range deletion and range key fragments if reading at
the most-recent state of the batch.

A new randomized test is added that tests the visiblity of batch range
deletions and range keys in the presence of Clone.
  • Loading branch information
jbowens committed May 19, 2022
1 parent b14ad70 commit 4e33626
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 45 deletions.
134 changes: 90 additions & 44 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,21 @@ type Batch struct {

// Fragmented range deletion tombstones. Cached the first time a range
// deletion iterator is requested. The cache is invalidated whenever a new
// range deletion is added to the batch.
tombstones []keyspan.Span
// range deletion is added to the batch. This cache can only be used when
// opening an iterator to read at a batch sequence number >=
// tombstonesSeqNum. This is the case for all new iterators created over a
// batch but it's not the case for all cloned iterators.
tombstones []keyspan.Span
tombstonesSeqNum uint64

// Fragmented range key spans. Cached the first time a range key iterator is
// requested. The cache is invalidated whenever a new range key
// (RangeKey{Set,Unset,Del}) is added to the batch.
rangeKeys []keyspan.Span
// (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
// used when opening an iterator to read at a batch sequence number >=
// tombstonesSeqNum. This is the case for all new iterators created over a
// batch but it's not the case for all cloned iterators.
rangeKeys []keyspan.Span
rangeKeysSeqNum uint64

// The flushableBatch wrapper if the batch is too large to fit in the
// memtable.
Expand Down Expand Up @@ -388,12 +396,14 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
switch kind {
case InternalKeyKindRangeDelete:
b.tombstones = nil
b.tombstonesSeqNum = 0
if b.rangeDelIndex == nil {
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
}
err = b.rangeDelIndex.Add(uint32(offset))
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.rangeKeys = nil
b.rangeKeysSeqNum = 0
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
}
Expand Down Expand Up @@ -648,6 +658,7 @@ func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
b.countRangeDels++
if b.index != nil {
b.tombstones = nil
b.tombstonesSeqNum = 0
// Range deletions are rare, so we lazily allocate the index for them.
if b.rangeDelIndex == nil {
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
Expand Down Expand Up @@ -697,6 +708,7 @@ func (b *Batch) incrementRangeKeysCount() {
b.countRangeKeys++
if b.index != nil {
b.rangeKeys = nil
b.rangeKeysSeqNum = 0
// Range keys are rare, so we lazily allocate the index for them.
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
Expand Down Expand Up @@ -850,28 +862,45 @@ func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh
iter.Init(b.cmp, nil)
return
}
// TODO(jackson): Add a fast path for when the number of tombstones is
// unchanged.

// Fragment the range tombstones the first time a range deletion iterator is
// requested. The cached tombstones are invalidated if another range deletion
// tombstone is added to the batch.
if b.tombstones == nil {
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Emit: func(s keyspan.Span) {
b.tombstones = append(b.tombstones, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeDelIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeDels(frag, it, int(b.countRangeDels))
// requested. The cached tombstones are invalidated if another range
// deletion tombstone is added to the batch. This cache is only guaranteed
// to be correct if we're opening an iterator to read at a batch sequence
// number at least as high as tombstonesSeqNum. The cache is guaranteed to
// include all tombstones up to tombstonesSeqNum, and if any additional
// tombstones were added after that sequence number the cache would've been
// cleared.
nextSeqNum := b.nextSeqNum()
if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
iter.Init(b.cmp, b.tombstones)
return
}

tombstones := make([]keyspan.Span, 0, b.countRangeDels)
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Emit: func(s keyspan.Span) {
tombstones = append(tombstones, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeDelIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeDels(frag, it, int(b.countRangeDels))
iter.Init(b.cmp, tombstones)

// If we just read all the tombstones in the batch (eg, batchSnapshot was
// set to b.nextSeqNum()), then cache the tombstones so that a subsequent
// call to initRangeDelIter may use them without refragmenting.
if nextSeqNum == batchSnapshot {
b.tombstones = tombstones
b.tombstonesSeqNum = nextSeqNum
}
iter.Init(b.cmp, b.tombstones)
}

func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
Expand Down Expand Up @@ -909,29 +938,44 @@ func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh
iter.Init(b.cmp, nil)
return
}
// TODO(jackson): Add a fast path for when the number of range keys is
// unchanged.

// Fragment the range keys the first time a range key iterator is requested.
// The cached range keys are invalidated if another range key is added to
// the batch.
if b.rangeKeys == nil {
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Emit: func(s keyspan.Span) {
b.rangeKeys = append(b.rangeKeys, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeKeyIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeKeys(frag, it, int(b.countRangeKeys))
// The cached spans are invalidated if another range key is added to the
// batch. This cache is only guaranteed to be correct if we're opening an
// iterator to read at a batch sequence number at least as high as
// rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
// rangeKeysSeqNum, and if any additional range keys were added after that
// sequence number the cache would've been cleared.
nextSeqNum := b.nextSeqNum()
if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
iter.Init(b.cmp, b.rangeKeys)
return
}

rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Emit: func(s keyspan.Span) {
rangeKeys = append(rangeKeys, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeKeyIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeKeys(frag, it, int(b.countRangeKeys))
iter.Init(b.cmp, rangeKeys)

// If we just read all the range keys in the batch (eg, batchSnapshot was
// set to b.nextSeqNum()), then cache the range keys so that a subsequent
// call to initRangeKeyIter may use them without refragmenting.
if nextSeqNum == batchSnapshot {
b.rangeKeys = rangeKeys
b.rangeKeysSeqNum = nextSeqNum
}
iter.Init(b.cmp, b.rangeKeys)
}

func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
Expand Down Expand Up @@ -1000,7 +1044,9 @@ func (b *Batch) Reset() {
b.memTableSize = 0
b.deferredOp = DeferredBatchOp{}
b.tombstones = nil
b.tombstonesSeqNum = 0
b.rangeKeys = nil
b.rangeKeysSeqNum = 0
b.flushable = nil
b.commit = sync.WaitGroup{}
b.commitErr = nil
Expand Down
119 changes: 118 additions & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"fmt"
"io"
"math"
"math/rand"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -347,7 +349,7 @@ func TestIndexedBatchMutation(t *testing.T) {
opts.Experimental.RangeKeys = new(RangeKeysArena)
d, err := Open("", opts)
require.NoError(t, err)
defer d.Close()
defer func() { d.Close() }()

b := newIndexedBatch(d, DefaultComparer)
iters := map[string]*Iterator{}
Expand Down Expand Up @@ -406,6 +408,22 @@ func TestIndexedBatchMutation(t *testing.T) {
return err.Error()
}
return ""
case "reset":
for key, iter := range iters {
if err := iter.Close(); err != nil {
return err.Error()
}
delete(iters, key)
}
if d != nil {
if err := d.Close(); err != nil {
return err.Error()
}
}
opts.FS = vfs.NewMem()
d, err = Open("", opts)
require.NoError(t, err)
return ""
default:
return fmt.Sprintf("unrecognized command %q", td.Cmd)
}
Expand Down Expand Up @@ -1158,3 +1176,102 @@ func TestBatchMemTableSizeOverflow(t *testing.T) {
require.NoError(t, b.Close())
require.NoError(t, d.Close())
}

// TestBatchSpanCaching stress tests the caching of keyspan.Spans for range
// tombstones and range keys.
func TestBatchSpanCaching(t *testing.T) {
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
}
opts.Experimental.RangeKeys = new(RangeKeysArena)
d, err := Open("", opts)
require.NoError(t, err)
defer d.Close()

ks := testkeys.Alpha(1)
b := d.NewIndexedBatch()
for i := 0; i < ks.Count(); i++ {
k := testkeys.Key(ks, i)
require.NoError(t, b.Set(k, k, nil))
}

seed := int64(time.Now().UnixNano())
t.Logf("seed = %d", seed)
rng := rand.New(rand.NewSource(seed))
iters := make([][]*Iterator, ks.Count())
defer func() {
for _, keyIters := range iters {
for _, iter := range keyIters {
_ = iter.Close()
}
}
}()

// This test begins with one point key for every letter of the alphabet.
// Over the course of the test, point keys are 'replaced' with range keys
// with narrow bounds from left to right. Iterators are created at random,
// sometimes from the batch and sometimes by cloning existing iterators.

checkIter := func(iter *Iterator, nextKey int) {
var i int
for valid := iter.First(); valid; valid = iter.Next() {
hasPoint, hasRange := iter.HasPointAndRange()
require.Equal(t, testkeys.Key(ks, i), iter.Key())
if i < nextKey {
// This key should not exist as a point key, just a range key.
require.False(t, hasPoint)
require.True(t, hasRange)
} else {
require.True(t, hasPoint)
require.False(t, hasRange)
}
i++
}
require.Equal(t, ks.Count(), i)
}

// Each iteration of the below loop either reads or writes.
//
// A write iteration writes a new RANGEDEL and RANGEKEYSET into the batch,
// covering a single point key seeded above. Writing these two span keys
// together 'replaces' the point key with a range key. Each write iteration
// ratchets nextWriteKey so the next write iteration will write the next
// key.
//
// A read iteration creates a new iterator and ensures its state is
// expected: some prefix of only point keys, followed by a suffix of only
// range keys. Iterators created through Clone should observe the point keys
// that existed when the cloned iterator was created.
for nextWriteKey := 0; nextWriteKey < ks.Count(); {
p := rng.Float64()
switch {
case p < .10: /* 10 % */
// Write a new range deletion and range key.
start := testkeys.Key(ks, nextWriteKey)
end := append(start, 0x00)
require.NoError(t, b.DeleteRange(start, end, nil))
require.NoError(t, b.Experimental().RangeKeySet(start, end, nil, []byte("foo"), nil))
nextWriteKey++
case p < .55: /* 45 % */
// Create a new iterator directly from the batch and check that it
// observes the correct state.
iter := b.NewIter(&IterOptions{KeyTypes: IterKeyTypePointsAndRanges})
checkIter(iter, nextWriteKey)
iters[nextWriteKey] = append(iters[nextWriteKey], iter)
default: /* 45 % */
// Create a new iterator through cloning a random existing iterator
// and check that it observes the right state.
readKey := rng.Intn(nextWriteKey + 1)
itersForReadKey := iters[readKey]
if len(itersForReadKey) == 0 {
continue
}
iter, err := itersForReadKey[rng.Intn(len(itersForReadKey))].Clone()
require.NoError(t, err)
checkIter(iter, readKey)
iters[readKey] = append(iters[readKey], iter)
}
}
}
Loading

0 comments on commit 4e33626

Please sign in to comment.