From b1012423356b22c755575e8f2965cbdb8a166e6c Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 22 Dec 2020 17:39:30 -0500 Subject: [PATCH] db: add an Iterator.Clone to iterate over the same db state Needed for https://github.com/cockroachdb/cockroach/issues/41720 and https://github.com/cockroachdb/cockroach/issues/55461 --- batch.go | 3 +-- batch_test.go | 38 ++++++++++++++++++++------- compaction_test.go | 3 +-- data_test.go | 19 +++++++++++++- db.go | 39 ++++++++++++++++++++-------- ingest_test.go | 3 +-- iterator.go | 52 +++++++++++++++++++++++++++++++++++++ iterator_test.go | 13 +++++----- options.go | 3 ++- range_del_test.go | 3 +-- snapshot.go | 2 +- testdata/iterator_next_prev | 52 +++++++++++++++++++++++++++++++++++++ 12 files changed, 191 insertions(+), 39 deletions(-) diff --git a/batch.go b/batch.go index e4f89126c3..fe87aa6224 100644 --- a/batch.go +++ b/batch.go @@ -666,8 +666,7 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator { if b.index == nil { return &Iterator{err: ErrNotIndexed} } - return b.db.newIterInternal(b.newInternalIter(o), - b.newRangeDelIter(o), nil /* snapshot */, o) + return b.db.newIterInternal(b, nil /* snapshot */, o) } // newInternalIter creates a new internalIterator that iterates over the diff --git a/batch_test.go b/batch_test.go index c5feedce23..f0f7b85a33 100644 --- a/batch_test.go +++ b/batch_test.go @@ -183,7 +183,13 @@ func TestBatchEmpty(t *testing.T) { ib := newIndexedBatch(d, DefaultComparer) iter := ib.NewIter(nil) require.False(t, iter.First()) + iter2, err := iter.Clone() + require.NoError(t, err) require.NoError(t, iter.Close()) + _, err = iter.Clone() + require.True(t, err != nil) + require.False(t, iter2.First()) + require.NoError(t, iter2.Close()) } func TestBatchReset(t *testing.T) { @@ -271,25 +277,37 @@ func TestIndexedBatchReset(t *testing.T) { require.Nil(t, b.rangeDelIndex) count := func(ib *Batch) int { - count := 0 iter := ib.NewIter(nil) defer iter.Close() - for iter.First(); iter.Valid(); iter.Next() { - count++ + iter2, err := iter.Clone() + require.NoError(t, err) + defer iter2.Close() + var count [2]int + for i, it := range []*Iterator{iter, iter2} { + for it.First(); it.Valid(); it.Next() { + count[i]++ + } } - return count + require.Equal(t, count[0], count[1]) + return count[0] } contains := func(ib *Batch, key, value string) bool { - found := false iter := ib.NewIter(nil) defer iter.Close() - for iter.First(); iter.Valid(); iter.Next() { - if string(iter.Key()) == key && - string(iter.Value()) == value { - found = true + iter2, err := iter.Clone() + require.NoError(t, err) + defer iter2.Close() + var found [2]bool + for i, it := range []*Iterator{iter, iter2} { + for it.First(); it.Valid(); it.Next() { + if string(it.Key()) == key && + string(it.Value()) == value { + found[i] = true + } } } - return found + require.Equal(t, found[0], found[1]) + return found[0] } // Set a key and check whether the key-value pair is visible. b.Set([]byte(key), []byte(value), nil) diff --git a/compaction_test.go b/compaction_test.go index eebf2b04b4..87963c343e 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -1025,8 +1025,7 @@ func TestManualCompaction(t *testing.T) { seqNum: InternalKeySeqNumMax, } iter := snap.NewIter(nil) - defer iter.Close() - return runIterCmd(td, iter) + return runIterCmd(td, iter, true) case "async-compact": var s string diff --git a/data_test.go b/data_test.go index 27e9992d4c..e0a6ba73e1 100644 --- a/data_test.go +++ b/data_test.go @@ -59,7 +59,14 @@ func runGetCmd(td *datadriven.TestData, d *DB) string { return buf.String() } -func runIterCmd(d *datadriven.TestData, iter *Iterator) string { +func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string { + if closeIter { + defer func() { + if iter != nil { + iter.Close() + } + }() + } var b bytes.Buffer for _, line := range strings.Split(d.Input, "\n") { parts := strings.Fields(line) @@ -110,6 +117,16 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator) string { } iter.SetBounds(lower, upper) valid = iter.Valid() + case "clone": + clonedIter, err := iter.Clone() + if err != nil { + fmt.Fprintf(&b, "error in clone, skipping rest of input: err=%v\n", err) + return b.String() + } + if err = iter.Close(); err != nil { + fmt.Fprintf(&b, "err=%v\n", err) + } + iter = clonedIter default: return fmt.Sprintf("unknown op: %s", parts[0]) } diff --git a/db.go b/db.go index 877dc8c28c..e5ecbc41e8 100644 --- a/db.go +++ b/db.go @@ -675,11 +675,9 @@ var iterAllocPool = sync.Pool{ }, } -// newIterInternal constructs a new iterator, merging in batchIter as an extra +// newIterInternal constructs a new iterator, merging in batch iterators as an extra // level. -func (d *DB) newIterInternal( - batchIter internalIterator, batchRangeDelIter internalIterator, s *Snapshot, o *IterOptions, -) *Iterator { +func (d *DB) newIterInternal(batch *Batch, s *Snapshot, o *IterOptions) *Iterator { if err := d.closed.Load(); err != nil { panic(err) } @@ -711,20 +709,38 @@ func (d *DB) newIterInternal( split: d.split, readState: readState, keyBuf: buf.keyBuf, + batch: batch, + newIters: d.newIters, + seqNum: seqNum, } if o != nil { dbi.opts = *o } dbi.opts.logger = d.opts.Logger + return finishInitializingIter(buf) +} + +// finishInitializingIter is a helper for doing the non-trivial initialization +// of an Iterator. +func finishInitializingIter(buf *iterAlloc) *Iterator { + // Short-hand. + dbi := &buf.dbi + readState := dbi.readState + batch := dbi.batch + seqNum := dbi.seqNum + // Merging levels. mlevels := buf.mlevels[:0] - if batchIter != nil { + + // Top-level is the batch, if any. + if batch != nil { mlevels = append(mlevels, mergingIterLevel{ - iter: batchIter, - rangeDelIter: batchRangeDelIter, + iter: batch.newInternalIter(&dbi.opts), + rangeDelIter: batch.newRangeDelIter(&dbi.opts), }) } + // Next are the memtables. memtables := readState.memtables for i := len(memtables) - 1; i >= 0; i-- { mem := memtables[i] @@ -739,6 +755,8 @@ func (d *DB) newIterInternal( }) } + // Next are the file levels: L0 sub-levels followed by lower levels. + // Determine the final size for mlevels so that we can avoid any more // reallocations. This is important because each levelIter will hold a // reference to elements in mlevels. @@ -766,7 +784,7 @@ func (d *DB) newIterInternal( li = &levelIter{} } - li.init(dbi.opts, d.cmp, d.newIters, files, level, nil) + li.init(dbi.opts, dbi.cmp, dbi.newIters, files, level, nil) li.initRangeDel(&mlevels[0].rangeDelIter) li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey, &mlevels[0].isLargestUserKeyRangeDelSentinel) @@ -789,7 +807,7 @@ func (d *DB) newIterInternal( addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } - buf.merging.init(&dbi.opts, d.cmp, finalMLevels...) + buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...) buf.merging.snapshot = seqNum buf.merging.elideRangeTombstones = true return dbi @@ -819,8 +837,7 @@ func (d *DB) NewIndexedBatch() *Batch { // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for // point-in-time snapshots which avoids these problems. func (d *DB) NewIter(o *IterOptions) *Iterator { - return d.newIterInternal(nil, /* batchIter */ - nil /* batchRangeDelIter */, nil /* snapshot */, o) + return d.newIterInternal(nil /* batch */, nil /* snapshot */, o) } // NewSnapshot returns a point-in-time view of the current DB state. Iterators diff --git a/ingest_test.go b/ingest_test.go index cb78858914..0df3b3f23a 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -555,8 +555,7 @@ func TestIngest(t *testing.T) { case "iter": iter := d.NewIter(nil) - defer iter.Close() - return runIterCmd(td, iter) + return runIterCmd(td, iter, true) case "lsm": return runLSMCmd(td, d) diff --git a/iterator.go b/iterator.go index 3ff100aec5..88f33e200c 100644 --- a/iterator.go +++ b/iterator.go @@ -85,6 +85,12 @@ type Iterator struct { alloc *iterAlloc prefix []byte readSampling readSampling + + // Following fields are only used in Clone. + // Non-nil if this Iterator includes a Batch. + batch *Batch + newIters tableNewIters + seqNum uint64 } // readSampling stores variables used to sample a read to trigger a read @@ -758,3 +764,49 @@ func (i *Iterator) Metrics() IteratorMetrics { } return m } + +// Clone creates a new Iterator over the same underlying data, i.e., over the +// same {batch, memtables, sstables}). It starts with the same IterOptions but +// is not positioned. Note that IterOptions is not deep-copied, so the +// LowerBound and UpperBound slices will share memory with the original +// Iterator. Iterators assume that these bound slices are not mutated by the +// callers, for the lifetime of use by an Iterator. The lifetime of use spans +// from the Iterator creation/SetBounds call to the next SetBounds call. If +// the caller is tracking this lifetime in order to reuse memory of these +// slices, it must remember that now the lifetime of use is due to multiple +// Iterators. The simplest behavior the caller can adopt to decouple lifetimes +// is to call SetBounds on the new Iterator, immediately after Clone returns, +// with different bounds slices. +// +// Callers can use Clone if they need multiple iterators that need to see +// exactly the same underlying state of the DB. This should not be used to +// extend the lifetime of the data backing the original Iterator since that +// will cause an increase in memory and disk usage (use NewSnapshot for that +// purpose). +func (i *Iterator) Clone() (*Iterator, error) { + readState := i.readState + if readState == nil { + return nil, errors.Errorf("cannot Clone a closed Iterator") + } + // i is already holding a ref, so there is no race with unref here. + readState.ref() + // Bundle various structures under a single umbrella in order to allocate + // them together. + buf := iterAllocPool.Get().(*iterAlloc) + dbi := &buf.dbi + *dbi = Iterator{ + opts: i.opts, + alloc: buf, + cmp: i.cmp, + equal: i.equal, + iter: &buf.merging, + merge: i.merge, + split: i.split, + readState: readState, + keyBuf: buf.keyBuf, + batch: i.batch, + newIters: i.newIters, + seqNum: i.seqNum, + } + return finishInitializingIter(buf), nil +} diff --git a/iterator_test.go b/iterator_test.go index ea6174f4f5..36ffb5554c 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -418,6 +418,8 @@ func TestIterator(t *testing.T) { }) iter.snapshot = seqNum iter.elideRangeTombstones = true + // NB: This Iterator cannot be cloned since it is not constructed + // with a readState. It suffices for this test. return &Iterator{ opts: opts, cmp: cmp, @@ -465,8 +467,7 @@ func TestIterator(t *testing.T) { } iter := newIter(uint64(seqNum), opts) - defer iter.Close() - return runIterCmd(d, iter) + return runIterCmd(d, iter, true) default: return fmt.Sprintf("unknown command: %s", d.Cmd) @@ -584,7 +585,7 @@ func TestReadSampling(t *testing.T) { iter = snap.NewIter(nil) iter.readSampling.forceReadSampling = true } - return runIterCmd(td, iter) + return runIterCmd(td, iter, false) case "read-compactions": if d == nil { @@ -700,8 +701,7 @@ func TestIteratorTableFilter(t *testing.T) { seqNum: InternalKeySeqNumMax, } iter := snap.NewIter(iterOpts) - defer iter.Close() - return runIterCmd(td, iter) + return runIterCmd(td, iter, true) default: return fmt.Sprintf("unknown command: %s", td.Cmd) @@ -774,8 +774,7 @@ func TestIteratorNextPrev(t *testing.T) { seqNum: seqNum, } iter := snap.NewIter(nil) - defer iter.Close() - return runIterCmd(td, iter) + return runIterCmd(td, iter, true) default: return fmt.Sprintf("unknown command: %s", td.Cmd) diff --git a/options.go b/options.go index 1a3b4b4d73..ca42e1e57f 100644 --- a/options.go +++ b/options.go @@ -67,7 +67,8 @@ type IterOptions struct { UpperBound []byte // TableFilter can be used to filter the tables that are scanned during // iteration based on the user properties. Return true to scan the table and - // false to skip scanning. + // false to skip scanning. This function must be thread-safe since the same + // function can be used by multiple iterators, if the iterator is cloned. TableFilter func(userProps map[string]string) bool // Internal options. diff --git a/range_del_test.go b/range_del_test.go index a60fa3ebcb..f6410fdca4 100644 --- a/range_del_test.go +++ b/range_del_test.go @@ -90,8 +90,7 @@ func TestRangeDel(t *testing.T) { } iter := snap.NewIter(nil) - defer iter.Close() - return runIterCmd(td, iter) + return runIterCmd(td, iter, true) default: return fmt.Sprintf("unknown command: %s", td.Cmd) diff --git a/snapshot.go b/snapshot.go index 76aae30c6b..dba4cded2f 100644 --- a/snapshot.go +++ b/snapshot.go @@ -45,7 +45,7 @@ func (s *Snapshot) NewIter(o *IterOptions) *Iterator { if s.db == nil { panic(ErrClosed) } - return s.db.newIterInternal(nil /* batchIter */, nil /* batchRangeDelIter */, s, o) + return s.db.newIterInternal(nil /* batch */, s, o) } // Close closes the snapshot, releasing its resources. Close must be called. diff --git a/testdata/iterator_next_prev b/testdata/iterator_next_prev index d0dd0b87a2..5e45bcce26 100644 --- a/testdata/iterator_next_prev +++ b/testdata/iterator_next_prev @@ -139,6 +139,58 @@ next e:e . +# Test that the cloned iterator sees all the keys. +iter +set-bounds lower=a upper=f +first +next +next +clone +seek-ge a +next +next +---- +. +b:b +e:e +. +. +b:b +e:e +. + +# Test that the cloned iterator respects the original bounds. +iter +set-bounds lower=a upper=d +first +next +clone +seek-ge a +next +---- +. +b:b +. +. +b:b +. + +# Test that the cloned iterator respects the seq num. +iter seq=2 +set-bounds lower=a upper=f +first +next +clone +last +prev +---- +. +e:e +. +. +e:e +. + # Verify that switching from forward iteration to reverse iteration # properly skips over range tombstones at the end of reverse # iteration.