Skip to content

Commit

Permalink
db,sstable: block property collectors and filters
Browse files Browse the repository at this point in the history
Block property collectors (and the corresponding filters) are an optional
user-facing feature that can be used to ignore data in the context of
an Iterator. The finest granularity of operation is a data block, and the
properties are aggregated into the upper level index block, and the
sstable, so an entire lower level index block (and its data blocks), or
an entire sstable can be ignored.

Multiple collectors can be configured in a DB, with recommendations to
keep the total size of properties of a block to < 50-100 bytes. The
collector can extract properties from the key or value. The first use case
for this will be to extract the MVCC timestamp in CockroachDB, and use
it for fine grained time-bound iteration. One can imagine extracting
min-max values of other columns either in the key or value, when the
key-value represents tabular data.

To efficiently handle multiple collectors, a collector needs to be
identified with a short integer id, in addition to a unique name. The
id is encoded in index entries. The name is encoded in the properties
block for the sstable.

Helper collector and filter implementations are provided for properties
represented as a set of the form [lower,upper), where both lower and
upper are uint64.

The feature is incompatible with older versions of Pebble since block
handles can be longer and older versions have checks regarding no
extra bytes in a block handle, which will fail.

Fixes cockroachdb#1190
  • Loading branch information
sumeerbhola committed Oct 5, 2021
1 parent b2eb88a commit 42c2ea1
Show file tree
Hide file tree
Showing 17 changed files with 1,398 additions and 188 deletions.
6 changes: 5 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2032,7 +2032,7 @@ func (d *DB) runCompaction(
}()

snapshots := d.mu.snapshots.toSlice()

formatVers := d.mu.formatVers.vers
// Release the d.mu lock while doing I/O.
// Note the unusual order: Unlock and then Lock.
d.mu.Unlock()
Expand Down Expand Up @@ -2085,6 +2085,10 @@ func (d *DB) runCompaction(
}

writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level)
if formatVers < FormatBlockPropertyCollector {
// Cannot yet write block properties.
writerOpts.BlockPropertyCollectors = nil
}

newOutput := func() error {
fileMeta := &fileMetadata{}
Expand Down
8 changes: 7 additions & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ const (
// kind, base.InternalKeyKindSetWithDelete. Previous Pebble versions will be
// unable to open this database.
FormatSetWithDelete
// FormatBlockPropertyCollector is a format major version that introduces
// BlockPropertyCollectors.
FormatBlockPropertyCollector
// FormatNewest always contains the most recent format major version.
FormatNewest FormatMajorVersion = FormatSetWithDelete
FormatNewest FormatMajorVersion = FormatBlockPropertyCollector
)

// formatMajorVersionMigrations defines the migrations from one format
Expand Down Expand Up @@ -143,6 +146,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatSetWithDelete: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatSetWithDelete)
},
FormatBlockPropertyCollector: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
2 changes: 2 additions & 0 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatVersioned, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatSetWithDelete))
require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector))
require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion())
require.NoError(t, d.Close())

// If we Open the database again, leaving the default format, the
Expand Down
116 changes: 116 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,122 @@ func TestIteratorSeekOptErrors(t *testing.T) {
})
}

type testBlockIntervalCollector struct {
initialized bool
lower, upper uint64
}

func (bi *testBlockIntervalCollector) Add(key InternalKey, value []byte) error {
k := key.UserKey
if len(k) < 2 {
return nil
}
val, err := strconv.Atoi(string(k[len(k)-2:]))
if err != nil {
return err
}
if val < 0 {
panic("testBlockIntervalCollector expects values >= 0")
}
uval := uint64(val)
if !bi.initialized {
bi.lower, bi.upper = uval, uval+1
bi.initialized = true
return nil
}
if bi.lower > uval {
bi.lower = uval
}
if uval >= bi.upper {
bi.upper = uval+1
}
return nil
}

func (bi *testBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) {
// TODO: remove printf
fmt.Printf("tbia: %t [%d,%d)\n", bi.initialized, bi.lower, bi.upper)
bi.initialized = false
l, u := bi.lower, bi.upper
bi.lower, bi.upper = 0, 0
return l, u, nil
}

func TestIteratorBlockIntervalFilter(t *testing.T) {
var mem vfs.FS
var d *DB
defer func() {
require.NoError(t, d.Close())
}()

reset := func() {
if d != nil {
require.NoError(t, d.Close())
}

mem = vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))

opts := &Options{
FS: mem,
FormatMajorVersion: FormatNewest,
BlockPropertyCollectors: []func() BlockPropertyCollector{
func() BlockPropertyCollector {
return sstable.NewBlockIntervalCollector("test", 0, &testBlockIntervalCollector{})
},
},
}
lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1}
opts.Levels = append(opts.Levels, lo)

// Automatic compactions may compact away tombstones from L6, making
// some testcases non-deterministic.
opts.private.disableAutomaticCompactions = true
var err error
d, err = Open("", opts)
require.NoError(t, err)
}
reset()

datadriven.RunTest(
t, "testdata/iterator_block_interval_filter", func(td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
reset()
return ""

case "build":
b := d.NewBatch()
if err := runBatchDefineCmd(td, b); err != nil {
return err.Error()
}
if err := b.Commit(nil); err != nil {
return err.Error()
}
if err := d.Flush(); err != nil {
return err.Error()
}
return runLSMCmd(td, d)

case "iter":
var opts IterOptions
if len(td.CmdArgs) == 2 {
var lower, upper uint64
td.ScanArgs(t, "lower", &lower)
td.ScanArgs(t, "upper", &upper)
opts.BlockPropertyFilters = []BlockPropertyFilter{
sstable.NewBlockIntervalFilter("test", 0, lower, upper),
}
}
iter := d.NewIter(&opts)
return runIterCmd(td, iter, true)

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

func BenchmarkIteratorSeekGE(b *testing.B) {
m, keys := buildMemTable(b)
iter := &Iterator{
Expand Down
1 change: 1 addition & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (l *levelIter) init(
l.lower = opts.LowerBound
l.upper = opts.UpperBound
l.tableOpts.TableFilter = opts.TableFilter
l.tableOpts.BlockPropertyFilters = opts.BlockPropertyFilters
l.cmp = cmp
l.split = split
l.iterFile = nil
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000003.004",
"marker.format-version.000004.005",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
19 changes: 18 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type FilterPolicy = base.FilterPolicy
// TablePropertyCollector exports the sstable.TablePropertyCollector type.
type TablePropertyCollector = sstable.TablePropertyCollector

// BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
type BlockPropertyCollector = sstable.BlockPropertyCollector

// BlockPropertyFilter exports the sstable.BlockPropertyFilter type.
type BlockPropertyFilter = sstable.BlockPropertyFilter

// IterOptions hold the optional per-query parameters for NewIter.
//
// Like Options, a nil *IterOptions is valid and means to use the default
Expand All @@ -72,7 +78,12 @@ type IterOptions struct {
// false to skip scanning. This function must be thread-safe since the same
// function can be used by multiple iterators, if the iterator is cloned.
TableFilter func(userProps map[string]string) bool

// BlockPropertyFilters can be used to avoid scanning tables and blocks in
// tables. It is requires that this slice is sorted in increasing order of
// the BlockPropertyFilter.ShortID. This slice represents an intersection
// across all filters, i.e., all filters must indicate that the block is
// relevant.
BlockPropertyFilters []BlockPropertyFilter
// Internal options.
logger Logger
}
Expand Down Expand Up @@ -491,6 +502,11 @@ type Options struct {
// and lives for the lifetime of the table.
TablePropertyCollectors []func() TablePropertyCollector

// BlockPropertyCollectors is a list of BlockPropertyCollector creation
// functions. A new BlockPropertyCollector is created for each sstable
// built and lives for the lifetime of writing that table.
BlockPropertyCollectors []func() BlockPropertyCollector

// WALBytesPerSync sets the number of bytes to write to a WAL before calling
// Sync on it in the background. Just like with BytesPerSync above, this
// helps smooth out disk write latencies, and avoids cases where the OS
Expand Down Expand Up @@ -1157,6 +1173,7 @@ func (o *Options) MakeWriterOptions(level int) sstable.WriterOptions {
}
writerOpts.TableFormat = sstable.TableFormatRocksDBv2
writerOpts.TablePropertyCollectors = o.TablePropertyCollectors
writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors
}
levelOpts := o.Level(level)
writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
Expand Down
Loading

0 comments on commit 42c2ea1

Please sign in to comment.