Skip to content

Commit

Permalink
db: add an Iterator.Clone to iterate over the same db state
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeerbhola committed Dec 23, 2020
1 parent f614b5a commit 108cd02
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 39 deletions.
3 changes: 1 addition & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator {
if b.index == nil {
return &Iterator{err: ErrNotIndexed}
}
return b.db.newIterInternal(b.newInternalIter(o),
b.newRangeDelIter(o), nil /* snapshot */, o)
return b.db.newIterInternal(b, nil /* snapshot */, o)
}

// newInternalIter creates a new internalIterator that iterates over the
Expand Down
38 changes: 28 additions & 10 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ func TestBatchEmpty(t *testing.T) {
ib := newIndexedBatch(d, DefaultComparer)
iter := ib.NewIter(nil)
require.False(t, iter.First())
iter2, err := iter.Clone()
require.NoError(t, err)
require.NoError(t, iter.Close())
_, err = iter.Clone()
require.True(t, err != nil)
require.False(t, iter2.First())
require.NoError(t, iter2.Close())
}

func TestBatchReset(t *testing.T) {
Expand Down Expand Up @@ -271,25 +277,37 @@ func TestIndexedBatchReset(t *testing.T) {
require.Nil(t, b.rangeDelIndex)

count := func(ib *Batch) int {
count := 0
iter := ib.NewIter(nil)
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
count++
iter2, err := iter.Clone()
require.NoError(t, err)
defer iter2.Close()
var count [2]int
for i, it := range []*Iterator{iter, iter2} {
for it.First(); it.Valid(); it.Next() {
count[i]++
}
}
return count
require.Equal(t, count[0], count[1])
return count[0]
}
contains := func(ib *Batch, key, value string) bool {
found := false
iter := ib.NewIter(nil)
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
if string(iter.Key()) == key &&
string(iter.Value()) == value {
found = true
iter2, err := iter.Clone()
require.NoError(t, err)
defer iter2.Close()
var found [2]bool
for i, it := range []*Iterator{iter, iter2} {
for it.First(); it.Valid(); it.Next() {
if string(it.Key()) == key &&
string(it.Value()) == value {
found[i] = true
}
}
}
return found
require.Equal(t, found[0], found[1])
return found[0]
}
// Set a key and check whether the key-value pair is visible.
b.Set([]byte(key), []byte(value), nil)
Expand Down
3 changes: 1 addition & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,7 @@ func TestManualCompaction(t *testing.T) {
seqNum: InternalKeySeqNumMax,
}
iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

case "async-compact":
var s string
Expand Down
19 changes: 18 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ func runGetCmd(td *datadriven.TestData, d *DB) string {
return buf.String()
}

func runIterCmd(d *datadriven.TestData, iter *Iterator) string {
func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string {
if closeIter {
defer func() {
if iter != nil {
iter.Close()
}
}()
}
var b bytes.Buffer
for _, line := range strings.Split(d.Input, "\n") {
parts := strings.Fields(line)
Expand Down Expand Up @@ -110,6 +117,16 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator) string {
}
iter.SetBounds(lower, upper)
valid = iter.Valid()
case "clone":
clonedIter, err := iter.Clone()
if err != nil {
fmt.Fprintf(&b, "error in clone, skipping rest of input: err=%v\n", err)
return b.String()
}
if err = iter.Close(); err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
}
iter = clonedIter
default:
return fmt.Sprintf("unknown op: %s", parts[0])
}
Expand Down
39 changes: 28 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,9 @@ var iterAllocPool = sync.Pool{
},
}

// newIterInternal constructs a new iterator, merging in batchIter as an extra
// newIterInternal constructs a new iterator, merging in batch iterators as an extra
// level.
func (d *DB) newIterInternal(
batchIter internalIterator, batchRangeDelIter internalIterator, s *Snapshot, o *IterOptions,
) *Iterator {
func (d *DB) newIterInternal(batch *Batch, s *Snapshot, o *IterOptions) *Iterator {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -711,20 +709,38 @@ func (d *DB) newIterInternal(
split: d.split,
readState: readState,
keyBuf: buf.keyBuf,
batch: batch,
newIters: d.newIters,
seqNum: seqNum,
}
if o != nil {
dbi.opts = *o
}
dbi.opts.logger = d.opts.Logger
return finishInitializingIter(buf)
}

// finishInitializingIter is a helper for doing the non-trivial initialization
// of an Iterator.
func finishInitializingIter(buf *iterAlloc) *Iterator {
// Short-hand.
dbi := &buf.dbi
readState := dbi.readState
batch := dbi.batch
seqNum := dbi.seqNum

// Merging levels.
mlevels := buf.mlevels[:0]
if batchIter != nil {

// Top-level is the batch, if any.
if batch != nil {
mlevels = append(mlevels, mergingIterLevel{
iter: batchIter,
rangeDelIter: batchRangeDelIter,
iter: batch.newInternalIter(&dbi.opts),
rangeDelIter: batch.newRangeDelIter(&dbi.opts),
})
}

// Next are the memtables.
memtables := readState.memtables
for i := len(memtables) - 1; i >= 0; i-- {
mem := memtables[i]
Expand All @@ -739,6 +755,8 @@ func (d *DB) newIterInternal(
})
}

// Next are the file levels: L0 sub-levels followed by lower levels.

// Determine the final size for mlevels so that we can avoid any more
// reallocations. This is important because each levelIter will hold a
// reference to elements in mlevels.
Expand Down Expand Up @@ -766,7 +784,7 @@ func (d *DB) newIterInternal(
li = &levelIter{}
}

li.init(dbi.opts, d.cmp, d.newIters, files, level, nil)
li.init(dbi.opts, dbi.cmp, dbi.newIters, files, level, nil)
li.initRangeDel(&mlevels[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey,
&mlevels[0].isLargestUserKeyRangeDelSentinel)
Expand All @@ -789,7 +807,7 @@ func (d *DB) newIterInternal(
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}

buf.merging.init(&dbi.opts, d.cmp, finalMLevels...)
buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
return dbi
Expand Down Expand Up @@ -819,8 +837,7 @@ func (d *DB) NewIndexedBatch() *Batch {
// apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
// point-in-time snapshots which avoids these problems.
func (d *DB) NewIter(o *IterOptions) *Iterator {
return d.newIterInternal(nil, /* batchIter */
nil /* batchRangeDelIter */, nil /* snapshot */, o)
return d.newIterInternal(nil /* batch */, nil /* snapshot */, o)
}

// NewSnapshot returns a point-in-time view of the current DB state. Iterators
Expand Down
3 changes: 1 addition & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,7 @@ func TestIngest(t *testing.T) {

case "iter":
iter := d.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

case "lsm":
return runLSMCmd(td, d)
Expand Down
52 changes: 52 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ type Iterator struct {
alloc *iterAlloc
prefix []byte
readSampling readSampling

// Following fields are only used in Clone.
// Non-nil if this Iterator includes a Batch.
batch *Batch
newIters tableNewIters
seqNum uint64
}

// readSampling stores variables used to sample a read to trigger a read
Expand Down Expand Up @@ -758,3 +764,49 @@ func (i *Iterator) Metrics() IteratorMetrics {
}
return m
}

// Clone creates a new Iterator over the same underlying data, i.e., over the
// same {batch, memtables, sstables}). It starts with the same IterOptions but
// is not positioned. Note that IterOptions is not deep-copied, so the
// LowerBound and UpperBound slices will share memory with the original
// Iterator. Iterators assume that these bound slices are not mutated by the
// callers, for the lifetime of use by an Iterator. The lifetime of use spans
// from the Iterator creation/SetBounds call to the next SetBounds call. If
// the caller is tracking this lifetime in order to reuse memory of these
// slices, it must remember that now the lifetime of use is due to multiple
// Iterators. The simplest behavior the caller can adopt to decouple lifetimes
// is to call SetBounds on the new Iterator, immediately after Clone returns,
// with different bounds slices.
//
// Callers can use Clone if they need multiple iterators that need to see
// exactly the same underlying state of the DB. This should not be used to
// extend the lifetime of the data backing the original Iterator since that
// will cause an increase in memory and disk usage (use NewSnapshot for that
// purpose).
func (i *Iterator) Clone() (*Iterator, error) {
readState := i.readState
if readState == nil {
return nil, errors.Errorf("cannot Clone a closed Iterator")
}
// i is already holding a ref, so there is no race with unref here.
readState.ref()
// Bundle various structures under a single umbrella in order to allocate
// them together.
buf := iterAllocPool.Get().(*iterAlloc)
dbi := &buf.dbi
*dbi = Iterator{
opts: i.opts,
alloc: buf,
cmp: i.cmp,
equal: i.equal,
iter: &buf.merging,
merge: i.merge,
split: i.split,
readState: readState,
keyBuf: buf.keyBuf,
batch: i.batch,
newIters: i.newIters,
seqNum: i.seqNum,
}
return finishInitializingIter(buf), nil
}
13 changes: 6 additions & 7 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ func TestIterator(t *testing.T) {
})
iter.snapshot = seqNum
iter.elideRangeTombstones = true
// NB: This Iterator cannot be cloned since it is not constructed
// with a readState. It suffices for this test.
return &Iterator{
opts: opts,
cmp: cmp,
Expand Down Expand Up @@ -465,8 +467,7 @@ func TestIterator(t *testing.T) {
}

iter := newIter(uint64(seqNum), opts)
defer iter.Close()
return runIterCmd(d, iter)
return runIterCmd(d, iter, true)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
Expand Down Expand Up @@ -584,7 +585,7 @@ func TestReadSampling(t *testing.T) {
iter = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
}
return runIterCmd(td, iter)
return runIterCmd(td, iter, false)

case "read-compactions":
if d == nil {
Expand Down Expand Up @@ -700,8 +701,7 @@ func TestIteratorTableFilter(t *testing.T) {
seqNum: InternalKeySeqNumMax,
}
iter := snap.NewIter(iterOpts)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down Expand Up @@ -774,8 +774,7 @@ func TestIteratorNextPrev(t *testing.T) {
seqNum: seqNum,
}
iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down
3 changes: 2 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type IterOptions struct {
UpperBound []byte
// TableFilter can be used to filter the tables that are scanned during
// iteration based on the user properties. Return true to scan the table and
// false to skip scanning.
// false to skip scanning. This function must be thread-safe since the same
// function can be used by multiple iterators, if the iterator is cloned.
TableFilter func(userProps map[string]string) bool

// Internal options.
Expand Down
3 changes: 1 addition & 2 deletions range_del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func TestRangeDel(t *testing.T) {
}

iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Snapshot) NewIter(o *IterOptions) *Iterator {
if s.db == nil {
panic(ErrClosed)
}
return s.db.newIterInternal(nil /* batchIter */, nil /* batchRangeDelIter */, s, o)
return s.db.newIterInternal(nil /* batch */, s, o)
}

// Close closes the snapshot, releasing its resources. Close must be called.
Expand Down
Loading

0 comments on commit 108cd02

Please sign in to comment.