Skip to content

Commit

Permalink
db,sstable: block property collectors and filters
Browse files Browse the repository at this point in the history
Block property collectors (and the corresponding filters) are an optional
user-facing feature that can be used to ignore data in the context of
an Iterator. The finest granularity of operation is a data block, and the
properties are aggregated into the upper level index block, and the
sstable, so an entire lower level index block (and its data blocks), or
an entire sstable can be ignored.

Multiple collectors can be configured in a DB, with recommendations to
keep the total size of properties of a block to < 50-100 bytes. The
collector can extract properties from the key or value. The first use case
for this will be to extract the MVCC timestamp in CockroachDB, and use
it for fine grained time-bound iteration. One can imagine extracting
min-max values of other columns either in the key or value, when the
key-value represents tabular data.

To efficiently handle multiple collectors, a collector needs to be
identified with a short integer id, in addition to a unique name. The
name => id mapping is specific to an sstable and stored in the user
properties. The id is encoded in index entries instead of the name.

Helper collector and filter implementations are provided for properties
represented as a set of the form [lower,upper), where both lower and
upper are uint64.

The feature is incompatible with older versions of Pebble since block
handles can be longer and older versions have checks regarding no
extra bytes in a block handle, which will fail.

Fixes #1190
  • Loading branch information
sumeerbhola committed Oct 19, 2021
1 parent f2339cc commit 7fec828
Show file tree
Hide file tree
Showing 18 changed files with 2,396 additions and 196 deletions.
6 changes: 5 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,7 @@ func (d *DB) runCompaction(
}()

snapshots := d.mu.snapshots.toSlice()

formatVers := d.mu.formatVers.vers
// Release the d.mu lock while doing I/O.
// Note the unusual order: Unlock and then Lock.
d.mu.Unlock()
Expand Down Expand Up @@ -2105,6 +2105,10 @@ func (d *DB) runCompaction(
}

writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level)
if formatVers < FormatBlockPropertyCollector {
// Cannot yet write block properties.
writerOpts.BlockPropertyCollectors = nil
}

newOutput := func() error {
fileMeta := &fileMetadata{}
Expand Down
8 changes: 7 additions & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ const (
// kind, base.InternalKeyKindSetWithDelete. Previous Pebble versions will be
// unable to open this database.
FormatSetWithDelete
// FormatBlockPropertyCollector is a format major version that introduces
// BlockPropertyCollectors.
FormatBlockPropertyCollector
// FormatNewest always contains the most recent format major version.
FormatNewest FormatMajorVersion = FormatSetWithDelete
FormatNewest FormatMajorVersion = FormatBlockPropertyCollector
)

// formatMajorVersionMigrations defines the migrations from one format
Expand Down Expand Up @@ -143,6 +146,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatSetWithDelete: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatSetWithDelete)
},
FormatBlockPropertyCollector: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
2 changes: 2 additions & 0 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatVersioned, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatSetWithDelete))
require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector))
require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion())
require.NoError(t, d.Close())

// If we Open the database again, leaving the default format, the
Expand Down
167 changes: 167 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,171 @@ func TestIteratorSeekOptErrors(t *testing.T) {
})
}

type testBlockIntervalCollector struct {
offsetFromEnd int
initialized bool
lower, upper uint64
}

func (bi *testBlockIntervalCollector) Add(key InternalKey, value []byte) error {
k := key.UserKey
if len(k) < 2 + bi.offsetFromEnd {
return nil
}
n := len(k)-bi.offsetFromEnd-2
val, err := strconv.Atoi(string(k[n:n+2]))
if err != nil {
return err
}
if val < 0 {
panic("testBlockIntervalCollector expects values >= 0")
}
uval := uint64(val)
if !bi.initialized {
bi.lower, bi.upper = uval, uval+1
bi.initialized = true
return nil
}
if bi.lower > uval {
bi.lower = uval
}
if uval >= bi.upper {
bi.upper = uval+1
}
return nil
}

func (bi *testBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) {
bi.initialized = false
l, u := bi.lower, bi.upper
bi.lower, bi.upper = 0, 0
return l, u, nil
}

func TestIteratorBlockIntervalFilter(t *testing.T) {
var mem vfs.FS
var d *DB
defer func() {
require.NoError(t, d.Close())
}()

type collector struct {
id uint16
offset int
}
createDB := func(collectors []collector) {
if d != nil {
require.NoError(t, d.Close())
}

mem = vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))

var bpCollectors []func() BlockPropertyCollector
for _, c := range collectors {
coll := c
bpCollectors = append(bpCollectors, func() BlockPropertyCollector {
return sstable.NewBlockIntervalCollector(
fmt.Sprintf("%d", coll.id),
&testBlockIntervalCollector{offsetFromEnd: coll.offset})
})
}
opts := &Options{
FS: mem,
FormatMajorVersion: FormatNewest,
BlockPropertyCollectors: bpCollectors,
}
lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1}
opts.Levels = append(opts.Levels, lo)

// Automatic compactions may compact away tombstones from L6, making
// some testcases non-deterministic.
opts.private.disableAutomaticCompactions = true
var err error
d, err = Open("", opts)
require.NoError(t, err)
}

datadriven.RunTest(
t, "testdata/iterator_block_interval_filter", func(td *datadriven.TestData) string {
switch td.Cmd {
case "build":
var collectors []collector
for _, arg := range td.CmdArgs {
switch arg.Key {
case "id_offset":
if len(arg.Vals) != 2 {
return "id and offset not provided"
}
var id, offset int
var err error
if id, err = strconv.Atoi(arg.Vals[0]); err != nil {
return err.Error()
}
if offset, err = strconv.Atoi(arg.Vals[1]); err != nil {
return err.Error()
}
collectors = append(collectors, collector{id: uint16(id), offset: offset})
default:
return fmt.Sprintf("unknown key: %s", arg.Key)
}
}
createDB(collectors)
b := d.NewBatch()
if err := runBatchDefineCmd(td, b); err != nil {
return err.Error()
}
if err := b.Commit(nil); err != nil {
return err.Error()
}
if err := d.Flush(); err != nil {
return err.Error()
}
return runLSMCmd(td, d)

case "iter":
var opts IterOptions
for _, arg := range td.CmdArgs {
switch arg.Key {
case "id_lower_upper":
if len(arg.Vals) != 3 {
return "id, lower, upper not provided"
}
var id, lower, upper int
var err error
if id, err = strconv.Atoi(arg.Vals[0]); err != nil {
return err.Error()
}
if lower, err = strconv.Atoi(arg.Vals[1]); err != nil {
return err.Error()
}
if upper, err = strconv.Atoi(arg.Vals[2]); err != nil {
return err.Error()
}
opts.BlockPropertyFilters = append(opts.BlockPropertyFilters,
sstable.NewBlockIntervalFilter(fmt.Sprintf("%d", id),
uint64(lower), uint64(upper)))
default:
return fmt.Sprintf("unknown key: %s", arg.Key)
}
}
rand.Shuffle(len(opts.BlockPropertyFilters), func(i, j int) {
opts.BlockPropertyFilters[i], opts.BlockPropertyFilters[j] =
opts.BlockPropertyFilters[j], opts.BlockPropertyFilters[i]
})
iter := d.NewIter(&opts)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

// TODO(sumeer): randomized block interval filter test, with a single
// collector, that varies block sizes and checks subset relationship with
// source of truth computed using block size of 1.

func BenchmarkIteratorSeekGE(b *testing.B) {
m, keys := buildMemTable(b)
iter := &Iterator{
Expand Down Expand Up @@ -1233,3 +1398,5 @@ func BenchmarkIteratorSeekGENoop(b *testing.B) {
}
}
}

// TODO(sumeer): add block interval filtering benchmark
1 change: 1 addition & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (l *levelIter) init(
l.lower = opts.LowerBound
l.upper = opts.UpperBound
l.tableOpts.TableFilter = opts.TableFilter
l.tableOpts.BlockPropertyFilters = opts.BlockPropertyFilters
l.cmp = cmp
l.split = split
l.iterFile = nil
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000003.004",
"marker.format-version.000004.005",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
19 changes: 18 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type FilterPolicy = base.FilterPolicy
// TablePropertyCollector exports the sstable.TablePropertyCollector type.
type TablePropertyCollector = sstable.TablePropertyCollector

// BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
type BlockPropertyCollector = sstable.BlockPropertyCollector

// BlockPropertyFilter exports the sstable.BlockPropertyFilter type.
type BlockPropertyFilter = sstable.BlockPropertyFilter

// IterOptions hold the optional per-query parameters for NewIter.
//
// Like Options, a nil *IterOptions is valid and means to use the default
Expand All @@ -72,7 +78,12 @@ type IterOptions struct {
// false to skip scanning. This function must be thread-safe since the same
// function can be used by multiple iterators, if the iterator is cloned.
TableFilter func(userProps map[string]string) bool

// BlockPropertyFilters can be used to avoid scanning tables and blocks in
// tables. It is requires that this slice is sorted in increasing order of
// the BlockPropertyFilter.ShortID. This slice represents an intersection
// across all filters, i.e., all filters must indicate that the block is
// relevant.
BlockPropertyFilters []BlockPropertyFilter
// Internal options.
logger Logger
}
Expand Down Expand Up @@ -497,6 +508,11 @@ type Options struct {
// and lives for the lifetime of the table.
TablePropertyCollectors []func() TablePropertyCollector

// BlockPropertyCollectors is a list of BlockPropertyCollector creation
// functions. A new BlockPropertyCollector is created for each sstable
// built and lives for the lifetime of writing that table.
BlockPropertyCollectors []func() BlockPropertyCollector

// WALBytesPerSync sets the number of bytes to write to a WAL before calling
// Sync on it in the background. Just like with BytesPerSync above, this
// helps smooth out disk write latencies, and avoids cases where the OS
Expand Down Expand Up @@ -1166,6 +1182,7 @@ func (o *Options) MakeWriterOptions(level int) sstable.WriterOptions {
}
writerOpts.TableFormat = sstable.TableFormatRocksDBv2
writerOpts.TablePropertyCollectors = o.TablePropertyCollectors
writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors
}
levelOpts := o.Level(level)
writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
Expand Down
Loading

0 comments on commit 7fec828

Please sign in to comment.