Skip to content

Commit

Permalink
db: add external sstable merging iterator
Browse files Browse the repository at this point in the history
Add a pebble.NewExternalIter function that may be used to construct a
*pebble.Iterator that reads from a provided slice of sstables rather than
committed database state. Input sstables are required to contain all
zero-sequence number keys. Shadowing of keys is resolved by treating the files
as ordered in reverse chronological order.

This iterator is intended to replace the storage package's multiIterator.
  • Loading branch information
jbowens committed Feb 24, 2022
1 parent 85162b6 commit e058941
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 58 deletions.
36 changes: 35 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -413,11 +414,13 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
path := td.CmdArgs[0].String()

writeOpts := d.opts.MakeWriterOptions(0 /* level */, d.opts.FormatMajorVersion.MaxTableFormat())

f, err := fs.Create(path)
if err != nil {
return err
}
w := sstable.NewWriter(f, sstable.WriterOptions{})
w := sstable.NewWriter(f, writeOpts)
iters := []internalIterator{
b.newInternalIter(nil),
b.newRangeDelIter(nil),
Expand All @@ -437,6 +440,37 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
return err
}
}

if rki := b.newRangeKeyIter(nil); rki != nil {
for key, _ := rki.First(); key != nil; key, _ = rki.Next() {
s := rki.Current()
s.Start.SetSeqNum(0)

var err error
switch s.Start.Kind() {
case base.InternalKeyKindRangeKeySet:
suffixValue, rest, ok := rangekey.DecodeSuffixValue(s.Value)
if !ok || len(rest) > 0 {
panic("expected single unset single suffix")
}
err = w.RangeKeySet(s.Start.UserKey, s.End, suffixValue.Suffix, suffixValue.Value)
case base.InternalKeyKindRangeKeyUnset:
suffix, rest, ok := rangekey.DecodeSuffix(s.Value)
if !ok || len(rest) > 0 {
panic("expected single unset single suffix")
}
err = w.RangeKeyUnset(s.Start.UserKey, s.End, suffix)
case base.InternalKeyKindRangeKeyDelete:
err = w.RangeKeyDelete(s.Start.UserKey, s.End)
default:
panic("not a range key")
}
if err != nil {
return err
}
}
}

return w.Close()
}

Expand Down
171 changes: 171 additions & 0 deletions external_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package pebble

import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable"
)

// NewExternalIter takes an input set of sstable files which may overlap
// arbitrarily and returns an Iterator over the merged contents of the sstables.
// Input sstables may contain point keys, range keys, range deletions, etc. The
// input files slice must be sorted in reverse chronological ordering. A key in
// a file at a lower index will shadow a key with an identical user key
// contained within a file at a higher index.
//
// Input sstables must only contain keys with the zero sequence number.
func NewExternalIter(
o *Options,
iterOpts *IterOptions,
files []sstable.ReadableFile,
extraReaderOpts ...sstable.ReaderOption,
) (it *Iterator, err error) {
var readers []*sstable.Reader

// Ensure we close all the opened readers if we error out.
closeReaders := func() {
for i := range readers {
_ = readers[i].Close()
}
}
defer func() {
if err != nil {
closeReaders()
}
}()
readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...)
if err != nil {
return nil, err
}

buf := iterAllocPool.Get().(*iterAlloc)
dbi := &buf.dbi
*dbi = Iterator{
alloc: buf,
cmp: o.Comparer.Compare,
equal: o.equal(),
iter: &buf.merging,
merge: o.Merger.Merge,
split: o.Comparer.Split,
readState: nil,
keyBuf: buf.keyBuf,
prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
batch: nil,
newIters: func(f *manifest.FileMetadata, opts *IterOptions, bytesIterated *uint64) (internalIterator, keyspan.FragmentIterator, error) {
// NB: External iterators are currently constructed without any
// `levelIters`. newIters should never be called. When we support
// organizing multiple non-overlapping files into a single level
// (see TODO below), we'll need to adjust this tableNewIters
// implementation to open iterators by looking up f in a map
// of readers indexed by *fileMetadata.
panic("unreachable")
},
seqNum: base.InternalKeySeqNumMax,
}
if iterOpts != nil {
dbi.opts = *iterOpts
}
dbi.opts.logger = o.Logger

// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
// files in-memory. However, it would also require us to identify the bounds
// of all the files upfront.

// Ensure we close all iters if error out early.
mlevels := buf.mlevels[:0]
var rangeKeyIters []keyspan.FragmentIterator
defer func() {
if err != nil {
for i := range rangeKeyIters {
_ = rangeKeyIters[i].Close()
}
for i := range mlevels {
if mlevels[i].iter != nil {
_ = mlevels[i].iter.Close()
}
if mlevels[i].rangeDelIter != nil {
_ = mlevels[i].rangeDelIter.Close()
}
}
}
}()
if iterOpts.pointKeys() {
if len(files) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(files))
}
for _, r := range readers {
pointIter, err := r.NewIter(dbi.opts.LowerBound, dbi.opts.UpperBound)
if err != nil {
return nil, err
}
rangeDelIter, err := r.NewRawRangeDelIter()
if err != nil {
_ = pointIter.Close()
return nil, err
}
mlevels = append(mlevels, mergingIterLevel{
iter: pointIter,
rangeDelIter: rangeDelIter,
})
}
}
buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, mlevels...)
buf.merging.snapshot = base.InternalKeySeqNumMax
buf.merging.elideRangeTombstones = true

if dbi.opts.rangeKeys() {
for _, r := range readers {
rki, err := r.NewRawRangeKeyIter()
if err != nil {
return nil, err
}
if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
}
}

// TODO(jackson): Pool range-key iterator objects.
dbi.rangeKey = &iteratorRangeKeyState{}
fragmentedIter := &rangekey.Iter{}
fragmentedIter.Init(o.Comparer.Compare, o.Comparer.FormatKey, base.InternalKeySeqNumMax, rangeKeyIters...)
iter := &rangekey.DefragmentingIter{}
iter.Init(o.Comparer.Compare, fragmentedIter, rangekey.DefragmentLogical)
dbi.rangeKey.rangeKeyIter = iter

dbi.rangeKey.iter.Init(dbi.cmp, dbi.split, &buf.merging, iter, dbi.opts.RangeKeyMasking.Suffix)
dbi.iter = &dbi.rangeKey.iter
dbi.iter.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
}

// Close all the opened sstable.Readers when the Iterator is closed.
dbi.closeHook = closeReaders
return dbi, nil
}

func openExternalTables(
o *Options,
files []sstable.ReadableFile,
readerOpts sstable.ReaderOptions,
extraReaderOpts ...sstable.ReaderOption,
) (readers []*sstable.Reader, err error) {
readers = make([]*sstable.Reader, 0, len(files))
for i := range files {
r, err := sstable.NewReader(files[i], readerOpts, extraReaderOpts...)
if err != nil {
return readers, err
}
// Use the index of the file in files as the sequence number for all of
// its keys.
r.Properties.GlobalSeqNum = uint64(len(files) - i)
readers = append(readers, r)
}
return readers, err
}
66 changes: 66 additions & 0 deletions external_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package pebble

import (
"fmt"
"testing"

"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)

func TestExternalIterator(t *testing.T) {
mem := vfs.NewMem()
o := &Options{
FS: mem,
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatRangeKeys,
}
o.EnsureDefaults()
d, err := Open("", o)
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()

datadriven.RunTest(t, "testdata/external_iterator", func(td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
mem = vfs.NewMem()
return ""
case "build":
if err := runBuildCmd(td, d, mem); err != nil {
return err.Error()
}
return ""
case "iter":
opts := IterOptions{KeyTypes: IterKeyTypePointsAndRanges}
var files []sstable.ReadableFile
for _, arg := range td.CmdArgs {
switch arg.Key {
case "mask-suffix":
opts.RangeKeyMasking.Suffix = []byte(arg.Vals[0])
case "lower":
opts.LowerBound = []byte(arg.Vals[0])
case "upper":
opts.UpperBound = []byte(arg.Vals[0])
case "files":
for _, v := range arg.Vals {
f, err := mem.Open(v)
require.NoError(t, err)
files = append(files, f)
}
}
}
it, err := NewExternalIter(o, &opts, files)
require.NoError(t, err)
return runIterCmd(td, it, true /* close iter */)
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}
5 changes: 5 additions & 0 deletions internal/rangekey/defragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (i *DefragmentingIter) Error() error {
return i.iter.Error()
}

// Close closes the underlying iterators.
func (i *DefragmentingIter) Close() error {
return i.iter.Close()
}

// Current returns the span at the iterator's current position.
func (i *DefragmentingIter) Current() *CoalescedSpan {
return &i.curr
Expand Down
4 changes: 3 additions & 1 deletion internal/rangekey/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,9 @@ func (i *InterleavingIter) Error() error {

// Close implements (base.InternalIterator).Close.
func (i *InterleavingIter) Close() error {
return i.pointIter.Close()
perr := i.pointIter.Close()
rerr := i.rangeKeyIter.Close()
return firstError(perr, rerr)
}

// String implements (base.InternalIterator).String.
Expand Down
13 changes: 12 additions & 1 deletion internal/rangekey/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Iterator interface {
Prev() *CoalescedSpan
Current() *CoalescedSpan
Clone() Iterator
Close() error
}

// TODO(jackson): Consider modifying the interface to support returning 'empty'
Expand Down Expand Up @@ -69,7 +70,12 @@ type Iter struct {
var _ Iterator = (*Iter)(nil)

// Init initializes an iterator over a set of fragmented, coalesced spans.
func (i *Iter) Init(cmp base.Compare, formatKey base.FormatKey, visibleSeqNum uint64, iters ...keyspan.FragmentIterator) {
func (i *Iter) Init(
cmp base.Compare,
formatKey base.FormatKey,
visibleSeqNum uint64,
iters ...keyspan.FragmentIterator,
) {
*i = Iter{}
i.miter.Init(cmp, iters...)
i.coalescer.Init(cmp, formatKey, visibleSeqNum, func(span CoalescedSpan) {
Expand All @@ -96,6 +102,11 @@ func (i *Iter) Error() error {
return i.err
}

// Close closes all underlying iterators.
func (i *Iter) Close() error {
return i.miter.Close()
}

func (i *Iter) coalesceForward() *CoalescedSpan {
i.dir = +1
i.valid = false
Expand Down
5 changes: 5 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type Iterator struct {
prefixOrFullSeekKey []byte
readSampling readSampling
stats IteratorStats
closeHook func()

// Following fields are only used in Clone.
// Non-nil if this Iterator includes a Batch.
Expand Down Expand Up @@ -1509,6 +1510,10 @@ func (i *Iterator) Close() error {
i.readState = nil
}

if i.closeHook != nil {
i.closeHook()
}

// Close the closer for the current value if one was open.
if i.valueCloser != nil {
err = firstError(err, i.valueCloser.Close())
Expand Down
Loading

0 comments on commit e058941

Please sign in to comment.