diff --git a/batch.go b/batch.go index 10c20842b5..9ac36f14b8 100644 --- a/batch.go +++ b/batch.go @@ -233,13 +233,21 @@ type Batch struct { // Fragmented range deletion tombstones. Cached the first time a range // deletion iterator is requested. The cache is invalidated whenever a new - // range deletion is added to the batch. - tombstones []keyspan.Span + // range deletion is added to the batch. This cache can only be used when + // opening an iterator to read at a batch sequence number >= + // tombstonesSeqNum. This is the case for all new iterators created over a + // batch but it's not the case for all cloned iterators. + tombstones []keyspan.Span + tombstonesSeqNum uint64 // Fragmented range key spans. Cached the first time a range key iterator is // requested. The cache is invalidated whenever a new range key - // (RangeKey{Set,Unset,Del}) is added to the batch. - rangeKeys []keyspan.Span + // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be + // used when opening an iterator to read at a batch sequence number >= + // tombstonesSeqNum. This is the case for all new iterators created over a + // batch but it's not the case for all cloned iterators. + rangeKeys []keyspan.Span + rangeKeysSeqNum uint64 // The flushableBatch wrapper if the batch is too large to fit in the // memtable. @@ -388,12 +396,14 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { switch kind { case InternalKeyKindRangeDelete: b.tombstones = nil + b.tombstonesSeqNum = 0 if b.rangeDelIndex == nil { b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey) } err = b.rangeDelIndex.Add(uint32(offset)) case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: b.rangeKeys = nil + b.rangeKeysSeqNum = 0 if b.rangeKeyIndex == nil { b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey) } @@ -648,6 +658,7 @@ func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp { b.countRangeDels++ if b.index != nil { b.tombstones = nil + b.tombstonesSeqNum = 0 // Range deletions are rare, so we lazily allocate the index for them. if b.rangeDelIndex == nil { b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey) @@ -697,6 +708,7 @@ func (b *Batch) incrementRangeKeysCount() { b.countRangeKeys++ if b.index != nil { b.rangeKeys = nil + b.rangeKeysSeqNum = 0 // Range keys are rare, so we lazily allocate the index for them. if b.rangeKeyIndex == nil { b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey) @@ -850,28 +862,45 @@ func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh iter.Init(b.cmp, nil) return } - // TODO(jackson): Add a fast path for when the number of tombstones is - // unchanged. + // Fragment the range tombstones the first time a range deletion iterator is - // requested. The cached tombstones are invalidated if another range deletion - // tombstone is added to the batch. - if b.tombstones == nil { - frag := &keyspan.Fragmenter{ - Cmp: b.cmp, - Format: b.formatKey, - Emit: func(s keyspan.Span) { - b.tombstones = append(b.tombstones, s) - }, - } - it := &batchIter{ - cmp: b.cmp, - batch: b, - iter: b.rangeDelIndex.NewIter(nil, nil), - snapshot: batchSnapshot, - } - fragmentRangeDels(frag, it, int(b.countRangeDels)) + // requested. The cached tombstones are invalidated if another range + // deletion tombstone is added to the batch. This cache is only guaranteed + // to be correct if we're opening an iterator to read at a batch sequence + // number at least as high as tombstonesSeqNum. The cache is guaranteed to + // include all tombstones up to tombstonesSeqNum, and if any additional + // tombstones were added after that sequence number the cache would've been + // cleared. + nextSeqNum := b.nextSeqNum() + if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot { + iter.Init(b.cmp, b.tombstones) + return + } + + tombstones := make([]keyspan.Span, 0, b.countRangeDels) + frag := &keyspan.Fragmenter{ + Cmp: b.cmp, + Format: b.formatKey, + Emit: func(s keyspan.Span) { + tombstones = append(tombstones, s) + }, + } + it := &batchIter{ + cmp: b.cmp, + batch: b, + iter: b.rangeDelIndex.NewIter(nil, nil), + snapshot: batchSnapshot, + } + fragmentRangeDels(frag, it, int(b.countRangeDels)) + iter.Init(b.cmp, tombstones) + + // If we just read all the tombstones in the batch (eg, batchSnapshot was + // set to b.nextSeqNum()), then cache the tombstones so that a subsequent + // call to initRangeDelIter may use them without refragmenting. + if nextSeqNum == batchSnapshot { + b.tombstones = tombstones + b.tombstonesSeqNum = nextSeqNum } - iter.Init(b.cmp, b.tombstones) } func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) { @@ -909,29 +938,44 @@ func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh iter.Init(b.cmp, nil) return } - // TODO(jackson): Add a fast path for when the number of range keys is - // unchanged. // Fragment the range keys the first time a range key iterator is requested. - // The cached range keys are invalidated if another range key is added to - // the batch. - if b.rangeKeys == nil { - frag := &keyspan.Fragmenter{ - Cmp: b.cmp, - Format: b.formatKey, - Emit: func(s keyspan.Span) { - b.rangeKeys = append(b.rangeKeys, s) - }, - } - it := &batchIter{ - cmp: b.cmp, - batch: b, - iter: b.rangeKeyIndex.NewIter(nil, nil), - snapshot: batchSnapshot, - } - fragmentRangeKeys(frag, it, int(b.countRangeKeys)) + // The cached spans are invalidated if another range key is added to the + // batch. This cache is only guaranteed to be correct if we're opening an + // iterator to read at a batch sequence number at least as high as + // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to + // rangeKeysSeqNum, and if any additional range keys were added after that + // sequence number the cache would've been cleared. + nextSeqNum := b.nextSeqNum() + if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot { + iter.Init(b.cmp, b.rangeKeys) + return + } + + rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys) + frag := &keyspan.Fragmenter{ + Cmp: b.cmp, + Format: b.formatKey, + Emit: func(s keyspan.Span) { + rangeKeys = append(rangeKeys, s) + }, + } + it := &batchIter{ + cmp: b.cmp, + batch: b, + iter: b.rangeKeyIndex.NewIter(nil, nil), + snapshot: batchSnapshot, + } + fragmentRangeKeys(frag, it, int(b.countRangeKeys)) + iter.Init(b.cmp, rangeKeys) + + // If we just read all the range keys in the batch (eg, batchSnapshot was + // set to b.nextSeqNum()), then cache the range keys so that a subsequent + // call to initRangeKeyIter may use them without refragmenting. + if nextSeqNum == batchSnapshot { + b.rangeKeys = rangeKeys + b.rangeKeysSeqNum = nextSeqNum } - iter.Init(b.cmp, b.rangeKeys) } func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error { @@ -1000,7 +1044,9 @@ func (b *Batch) Reset() { b.memTableSize = 0 b.deferredOp = DeferredBatchOp{} b.tombstones = nil + b.tombstonesSeqNum = 0 b.rangeKeys = nil + b.rangeKeysSeqNum = 0 b.flushable = nil b.commit = sync.WaitGroup{} b.commitErr = nil diff --git a/batch_test.go b/batch_test.go index 3840bbce22..d39da7ed69 100644 --- a/batch_test.go +++ b/batch_test.go @@ -10,9 +10,11 @@ import ( "fmt" "io" "math" + "math/rand" "strconv" "strings" "testing" + "time" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -347,7 +349,7 @@ func TestIndexedBatchMutation(t *testing.T) { opts.Experimental.RangeKeys = new(RangeKeysArena) d, err := Open("", opts) require.NoError(t, err) - defer d.Close() + defer func() { d.Close() }() b := newIndexedBatch(d, DefaultComparer) iters := map[string]*Iterator{} @@ -406,6 +408,22 @@ func TestIndexedBatchMutation(t *testing.T) { return err.Error() } return "" + case "reset": + for key, iter := range iters { + if err := iter.Close(); err != nil { + return err.Error() + } + delete(iters, key) + } + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + } + opts.FS = vfs.NewMem() + d, err = Open("", opts) + require.NoError(t, err) + return "" default: return fmt.Sprintf("unrecognized command %q", td.Cmd) } @@ -1158,3 +1176,102 @@ func TestBatchMemTableSizeOverflow(t *testing.T) { require.NoError(t, b.Close()) require.NoError(t, d.Close()) } + +// TestBatchSpanCaching stress tests the caching of keyspan.Spans for range +// tombstones and range keys. +func TestBatchSpanCaching(t *testing.T) { + opts := &Options{ + Comparer: testkeys.Comparer, + FS: vfs.NewMem(), + FormatMajorVersion: FormatNewest, + } + opts.Experimental.RangeKeys = new(RangeKeysArena) + d, err := Open("", opts) + require.NoError(t, err) + defer d.Close() + + ks := testkeys.Alpha(1) + b := d.NewIndexedBatch() + for i := 0; i < ks.Count(); i++ { + k := testkeys.Key(ks, i) + require.NoError(t, b.Set(k, k, nil)) + } + + seed := int64(time.Now().UnixNano()) + t.Logf("seed = %d", seed) + rng := rand.New(rand.NewSource(seed)) + iters := make([][]*Iterator, ks.Count()) + defer func() { + for _, keyIters := range iters { + for _, iter := range keyIters { + _ = iter.Close() + } + } + }() + + // This test begins with one point key for every letter of the alphabet. + // Over the course of the test, point keys are 'replaced' with range keys + // with narrow bounds from left to right. Iterators are created at random, + // sometimes from the batch and sometimes by cloning existing iterators. + + checkIter := func(iter *Iterator, nextKey int) { + var i int + for valid := iter.First(); valid; valid = iter.Next() { + hasPoint, hasRange := iter.HasPointAndRange() + require.Equal(t, testkeys.Key(ks, i), iter.Key()) + if i < nextKey { + // This key should not exist as a point key, just a range key. + require.False(t, hasPoint) + require.True(t, hasRange) + } else { + require.True(t, hasPoint) + require.False(t, hasRange) + } + i++ + } + require.Equal(t, ks.Count(), i) + } + + // Each iteration of the below loop either reads or writes. + // + // A write iteration writes a new RANGEDEL and RANGEKEYSET into the batch, + // covering a single point key seeded above. Writing these two span keys + // together 'replaces' the point key with a range key. Each write iteration + // ratchets nextWriteKey so the next write iteration will write the next + // key. + // + // A read iteration creates a new iterator and ensures its state is + // expected: some prefix of only point keys, followed by a suffix of only + // range keys. Iterators created through Clone should observe the point keys + // that existed when the cloned iterator was created. + for nextWriteKey := 0; nextWriteKey < ks.Count(); { + p := rng.Float64() + switch { + case p < .10: /* 10 % */ + // Write a new range deletion and range key. + start := testkeys.Key(ks, nextWriteKey) + end := append(start, 0x00) + require.NoError(t, b.DeleteRange(start, end, nil)) + require.NoError(t, b.Experimental().RangeKeySet(start, end, nil, []byte("foo"), nil)) + nextWriteKey++ + case p < .55: /* 45 % */ + // Create a new iterator directly from the batch and check that it + // observes the correct state. + iter := b.NewIter(&IterOptions{KeyTypes: IterKeyTypePointsAndRanges}) + checkIter(iter, nextWriteKey) + iters[nextWriteKey] = append(iters[nextWriteKey], iter) + default: /* 45 % */ + // Create a new iterator through cloning a random existing iterator + // and check that it observes the right state. + readKey := rng.Intn(nextWriteKey + 1) + itersForReadKey := iters[readKey] + if len(itersForReadKey) == 0 { + continue + } + iter, err := itersForReadKey[rng.Intn(len(itersForReadKey))].Clone() + require.NoError(t, err) + checkIter(iter, readKey) + iters[readKey] = append(iters[readKey], iter) + } + } +} diff --git a/testdata/indexed_batch_mutation b/testdata/indexed_batch_mutation index a3e25e8e6e..0995dcf85b 100644 --- a/testdata/indexed_batch_mutation +++ b/testdata/indexed_batch_mutation @@ -345,3 +345,61 @@ foo: (foo, [a-z) @2=bax, @1=boop) a: (., [a-z) @6=yaya, @2=bax, @1=boop) c: (c, [a-z) @6=yaya, @2=bax, @1=boop) foo: (foo, [a-z) @6=yaya, @2=bax, @1=boop) + +# Test a scenario where constructing an Iterator should NOT use the cached +# fragmented tombstones / range keys, because the new Iterator is a Clone which +# must read at an earlier batch sequence number. + +# Reset and start a new batch. + +reset +---- + +new-batch +set foo foo +---- + +new-iter i1 +---- + +iter iter=i1 +first +next +---- +foo: (foo, .) +. + +# Apply a range deletion and a range key. + +mutate +del-range a z +range-key-set a z @1 foo +---- + +# Create a new iterator which will see both the range deletion and the range +# key, and cache both on the batch so that future iterators constructed over the +# batch do not need to. + +new-iter i2 +---- + +iter iter=i2 +first +next +---- +a: (., [a-z) @1=foo) +. + +# Clone the original iterator from before the delete range and the range key +# were created. It should not use the cached fragments of range deletions or +# range keys, and should not see the effects of either. + +clone from=i1 to=i3 +---- + +iter iter=i3 +first +next +---- +foo: (foo, .) +.