Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add an Iterator.Clone to iterate over the same db state #1033

Merged
merged 1 commit into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator {
if b.index == nil {
return &Iterator{err: ErrNotIndexed}
}
return b.db.newIterInternal(b.newInternalIter(o),
b.newRangeDelIter(o), nil /* snapshot */, o)
return b.db.newIterInternal(b, nil /* snapshot */, o)
}

// newInternalIter creates a new internalIterator that iterates over the
Expand Down
38 changes: 28 additions & 10 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ func TestBatchEmpty(t *testing.T) {
ib := newIndexedBatch(d, DefaultComparer)
iter := ib.NewIter(nil)
require.False(t, iter.First())
iter2, err := iter.Clone()
require.NoError(t, err)
require.NoError(t, iter.Close())
_, err = iter.Clone()
require.True(t, err != nil)
require.False(t, iter2.First())
require.NoError(t, iter2.Close())
}

func TestBatchReset(t *testing.T) {
Expand Down Expand Up @@ -271,25 +277,37 @@ func TestIndexedBatchReset(t *testing.T) {
require.Nil(t, b.rangeDelIndex)

count := func(ib *Batch) int {
count := 0
iter := ib.NewIter(nil)
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
count++
iter2, err := iter.Clone()
require.NoError(t, err)
defer iter2.Close()
var count [2]int
for i, it := range []*Iterator{iter, iter2} {
for it.First(); it.Valid(); it.Next() {
count[i]++
}
}
return count
require.Equal(t, count[0], count[1])
return count[0]
}
contains := func(ib *Batch, key, value string) bool {
found := false
iter := ib.NewIter(nil)
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
if string(iter.Key()) == key &&
string(iter.Value()) == value {
found = true
iter2, err := iter.Clone()
require.NoError(t, err)
defer iter2.Close()
var found [2]bool
for i, it := range []*Iterator{iter, iter2} {
for it.First(); it.Valid(); it.Next() {
if string(it.Key()) == key &&
string(it.Value()) == value {
found[i] = true
}
}
}
return found
require.Equal(t, found[0], found[1])
return found[0]
}
// Set a key and check whether the key-value pair is visible.
b.Set([]byte(key), []byte(value), nil)
Expand Down
3 changes: 1 addition & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,7 @@ func TestManualCompaction(t *testing.T) {
seqNum: InternalKeySeqNumMax,
}
iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

case "async-compact":
var s string
Expand Down
19 changes: 18 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ func runGetCmd(td *datadriven.TestData, d *DB) string {
return buf.String()
}

func runIterCmd(d *datadriven.TestData, iter *Iterator) string {
func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string {
if closeIter {
defer func() {
if iter != nil {
iter.Close()
}
}()
}
var b bytes.Buffer
for _, line := range strings.Split(d.Input, "\n") {
parts := strings.Fields(line)
Expand Down Expand Up @@ -110,6 +117,16 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator) string {
}
iter.SetBounds(lower, upper)
valid = iter.Valid()
case "clone":
clonedIter, err := iter.Clone()
if err != nil {
fmt.Fprintf(&b, "error in clone, skipping rest of input: err=%v\n", err)
return b.String()
}
if err = iter.Close(); err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
}
iter = clonedIter
default:
return fmt.Sprintf("unknown op: %s", parts[0])
}
Expand Down
39 changes: 28 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,9 @@ var iterAllocPool = sync.Pool{
},
}

// newIterInternal constructs a new iterator, merging in batchIter as an extra
// newIterInternal constructs a new iterator, merging in batch iterators as an extra
// level.
func (d *DB) newIterInternal(
batchIter internalIterator, batchRangeDelIter internalIterator, s *Snapshot, o *IterOptions,
) *Iterator {
func (d *DB) newIterInternal(batch *Batch, s *Snapshot, o *IterOptions) *Iterator {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -711,20 +709,38 @@ func (d *DB) newIterInternal(
split: d.split,
readState: readState,
keyBuf: buf.keyBuf,
batch: batch,
newIters: d.newIters,
seqNum: seqNum,
}
if o != nil {
dbi.opts = *o
}
dbi.opts.logger = d.opts.Logger
return finishInitializingIter(buf)
}

// finishInitializingIter is a helper for doing the non-trivial initialization
// of an Iterator.
func finishInitializingIter(buf *iterAlloc) *Iterator {
// Short-hand.
dbi := &buf.dbi
readState := dbi.readState
batch := dbi.batch
seqNum := dbi.seqNum

// Merging levels.
mlevels := buf.mlevels[:0]
if batchIter != nil {

// Top-level is the batch, if any.
if batch != nil {
mlevels = append(mlevels, mergingIterLevel{
iter: batchIter,
rangeDelIter: batchRangeDelIter,
iter: batch.newInternalIter(&dbi.opts),
rangeDelIter: batch.newRangeDelIter(&dbi.opts),
})
}

// Next are the memtables.
memtables := readState.memtables
for i := len(memtables) - 1; i >= 0; i-- {
mem := memtables[i]
Expand All @@ -739,6 +755,8 @@ func (d *DB) newIterInternal(
})
}

// Next are the file levels: L0 sub-levels followed by lower levels.

// Determine the final size for mlevels so that we can avoid any more
// reallocations. This is important because each levelIter will hold a
// reference to elements in mlevels.
Expand Down Expand Up @@ -766,7 +784,7 @@ func (d *DB) newIterInternal(
li = &levelIter{}
}

li.init(dbi.opts, d.cmp, d.newIters, files, level, nil)
li.init(dbi.opts, dbi.cmp, dbi.newIters, files, level, nil)
li.initRangeDel(&mlevels[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey,
&mlevels[0].isLargestUserKeyRangeDelSentinel)
Expand All @@ -789,7 +807,7 @@ func (d *DB) newIterInternal(
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}

buf.merging.init(&dbi.opts, d.cmp, finalMLevels...)
buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
return dbi
Expand Down Expand Up @@ -819,8 +837,7 @@ func (d *DB) NewIndexedBatch() *Batch {
// apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
// point-in-time snapshots which avoids these problems.
func (d *DB) NewIter(o *IterOptions) *Iterator {
return d.newIterInternal(nil, /* batchIter */
nil /* batchRangeDelIter */, nil /* snapshot */, o)
return d.newIterInternal(nil /* batch */, nil /* snapshot */, o)
}

// NewSnapshot returns a point-in-time view of the current DB state. Iterators
Expand Down
3 changes: 1 addition & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,7 @@ func TestIngest(t *testing.T) {

case "iter":
iter := d.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

case "lsm":
return runLSMCmd(td, d)
Expand Down
52 changes: 52 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ type Iterator struct {
alloc *iterAlloc
prefix []byte
readSampling readSampling

// Following fields are only used in Clone.
// Non-nil if this Iterator includes a Batch.
batch *Batch
newIters tableNewIters
seqNum uint64
}

// readSampling stores variables used to sample a read to trigger a read
Expand Down Expand Up @@ -758,3 +764,49 @@ func (i *Iterator) Metrics() IteratorMetrics {
}
return m
}

// Clone creates a new Iterator over the same underlying data, i.e., over the
// same {batch, memtables, sstables}). It starts with the same IterOptions but
// is not positioned. Note that IterOptions is not deep-copied, so the
// LowerBound and UpperBound slices will share memory with the original
// Iterator. Iterators assume that these bound slices are not mutated by the
// callers, for the lifetime of use by an Iterator. The lifetime of use spans
// from the Iterator creation/SetBounds call to the next SetBounds call. If
// the caller is tracking this lifetime in order to reuse memory of these
// slices, it must remember that now the lifetime of use is due to multiple
// Iterators. The simplest behavior the caller can adopt to decouple lifetimes
// is to call SetBounds on the new Iterator, immediately after Clone returns,
// with different bounds slices.
//
// Callers can use Clone if they need multiple iterators that need to see
// exactly the same underlying state of the DB. This should not be used to
// extend the lifetime of the data backing the original Iterator since that
// will cause an increase in memory and disk usage (use NewSnapshot for that
// purpose).
func (i *Iterator) Clone() (*Iterator, error) {
readState := i.readState
if readState == nil {
return nil, errors.Errorf("cannot Clone a closed Iterator")
}
// i is already holding a ref, so there is no race with unref here.
readState.ref()
// Bundle various structures under a single umbrella in order to allocate
// them together.
buf := iterAllocPool.Get().(*iterAlloc)
dbi := &buf.dbi
*dbi = Iterator{
opts: i.opts,
alloc: buf,
cmp: i.cmp,
equal: i.equal,
iter: &buf.merging,
merge: i.merge,
split: i.split,
readState: readState,
keyBuf: buf.keyBuf,
batch: i.batch,
newIters: i.newIters,
seqNum: i.seqNum,
}
return finishInitializingIter(buf), nil
}
13 changes: 6 additions & 7 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ func TestIterator(t *testing.T) {
})
iter.snapshot = seqNum
iter.elideRangeTombstones = true
// NB: This Iterator cannot be cloned since it is not constructed
// with a readState. It suffices for this test.
return &Iterator{
opts: opts,
cmp: cmp,
Expand Down Expand Up @@ -465,8 +467,7 @@ func TestIterator(t *testing.T) {
}

iter := newIter(uint64(seqNum), opts)
defer iter.Close()
return runIterCmd(d, iter)
return runIterCmd(d, iter, true)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
Expand Down Expand Up @@ -584,7 +585,7 @@ func TestReadSampling(t *testing.T) {
iter = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
}
return runIterCmd(td, iter)
return runIterCmd(td, iter, false)

case "read-compactions":
if d == nil {
Expand Down Expand Up @@ -700,8 +701,7 @@ func TestIteratorTableFilter(t *testing.T) {
seqNum: InternalKeySeqNumMax,
}
iter := snap.NewIter(iterOpts)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down Expand Up @@ -774,8 +774,7 @@ func TestIteratorNextPrev(t *testing.T) {
seqNum: seqNum,
}
iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down
3 changes: 2 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type IterOptions struct {
UpperBound []byte
// TableFilter can be used to filter the tables that are scanned during
// iteration based on the user properties. Return true to scan the table and
// false to skip scanning.
// false to skip scanning. This function must be thread-safe since the same
// function can be used by multiple iterators, if the iterator is cloned.
TableFilter func(userProps map[string]string) bool

// Internal options.
Expand Down
3 changes: 1 addition & 2 deletions range_del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func TestRangeDel(t *testing.T) {
}

iter := snap.NewIter(nil)
defer iter.Close()
return runIterCmd(td, iter)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Snapshot) NewIter(o *IterOptions) *Iterator {
if s.db == nil {
panic(ErrClosed)
}
return s.db.newIterInternal(nil /* batchIter */, nil /* batchRangeDelIter */, s, o)
return s.db.newIterInternal(nil /* batch */, s, o)
}

// Close closes the snapshot, releasing its resources. Close must be called.
Expand Down
Loading