From 42c2ea157bcf807e0455ff0220d927db62a1cfbe Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 12 Aug 2021 16:38:58 -0400 Subject: [PATCH] db,sstable: block property collectors and filters Block property collectors (and the corresponding filters) are an optional user-facing feature that can be used to ignore data in the context of an Iterator. The finest granularity of operation is a data block, and the properties are aggregated into the upper level index block, and the sstable, so an entire lower level index block (and its data blocks), or an entire sstable can be ignored. Multiple collectors can be configured in a DB, with recommendations to keep the total size of properties of a block to < 50-100 bytes. The collector can extract properties from the key or value. The first use case for this will be to extract the MVCC timestamp in CockroachDB, and use it for fine grained time-bound iteration. One can imagine extracting min-max values of other columns either in the key or value, when the key-value represents tabular data. To efficiently handle multiple collectors, a collector needs to be identified with a short integer id, in addition to a unique name. The id is encoded in index entries. The name is encoded in the properties block for the sstable. Helper collector and filter implementations are provided for properties represented as a set of the form [lower,upper), where both lower and upper are uint64. The feature is incompatible with older versions of Pebble since block handles can be longer and older versions have checks regarding no extra bytes in a block handle, which will fail. Fixes #1190 --- compaction.go | 6 +- format_major_version.go | 8 +- format_major_version_test.go | 2 + iterator_test.go | 116 +++++ level_iter.go | 1 + open_test.go | 2 +- options.go | 19 +- sstable/block_property.go | 472 ++++++++++++++++++ sstable/filter.go | 11 +- sstable/options.go | 5 + sstable/reader.go | 614 ++++++++++++++++++------ sstable/table.go | 21 +- sstable/writer.go | 174 +++++-- table_cache.go | 42 +- testdata/checkpoint | 13 +- testdata/event_listener | 10 +- testdata/iterator_block_interval_filter | 70 +++ 17 files changed, 1398 insertions(+), 188 deletions(-) create mode 100644 sstable/block_property.go create mode 100644 testdata/iterator_block_interval_filter diff --git a/compaction.go b/compaction.go index 008b7a154cd..9cbdb86f82b 100644 --- a/compaction.go +++ b/compaction.go @@ -2032,7 +2032,7 @@ func (d *DB) runCompaction( }() snapshots := d.mu.snapshots.toSlice() - + formatVers := d.mu.formatVers.vers // Release the d.mu lock while doing I/O. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() @@ -2085,6 +2085,10 @@ func (d *DB) runCompaction( } writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level) + if formatVers < FormatBlockPropertyCollector { + // Cannot yet write block properties. + writerOpts.BlockPropertyCollectors = nil + } newOutput := func() error { fileMeta := &fileMetadata{} diff --git a/format_major_version.go b/format_major_version.go index 2d87cf24c5d..01ae650cbe1 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -66,8 +66,11 @@ const ( // kind, base.InternalKeyKindSetWithDelete. Previous Pebble versions will be // unable to open this database. FormatSetWithDelete + // FormatBlockPropertyCollector is a format major version that introduces + // BlockPropertyCollectors. + FormatBlockPropertyCollector // FormatNewest always contains the most recent format major version. - FormatNewest FormatMajorVersion = FormatSetWithDelete + FormatNewest FormatMajorVersion = FormatBlockPropertyCollector ) // formatMajorVersionMigrations defines the migrations from one format @@ -143,6 +146,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSetWithDelete: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSetWithDelete) }, + FormatBlockPropertyCollector: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 275bd600470..9293bbfd2f7 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -33,6 +33,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVersioned, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSetWithDelete)) require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector)) + require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion()) require.NoError(t, d.Close()) // If we Open the database again, leaving the default format, the diff --git a/iterator_test.go b/iterator_test.go index 62e1c004274..f1e6337cc79 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -959,6 +959,122 @@ func TestIteratorSeekOptErrors(t *testing.T) { }) } +type testBlockIntervalCollector struct { + initialized bool + lower, upper uint64 +} + +func (bi *testBlockIntervalCollector) Add(key InternalKey, value []byte) error { + k := key.UserKey + if len(k) < 2 { + return nil + } + val, err := strconv.Atoi(string(k[len(k)-2:])) + if err != nil { + return err + } + if val < 0 { + panic("testBlockIntervalCollector expects values >= 0") + } + uval := uint64(val) + if !bi.initialized { + bi.lower, bi.upper = uval, uval+1 + bi.initialized = true + return nil + } + if bi.lower > uval { + bi.lower = uval + } + if uval >= bi.upper { + bi.upper = uval+1 + } + return nil +} + +func (bi *testBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) { + // TODO: remove printf + fmt.Printf("tbia: %t [%d,%d)\n", bi.initialized, bi.lower, bi.upper) + bi.initialized = false + l, u := bi.lower, bi.upper + bi.lower, bi.upper = 0, 0 + return l, u, nil +} + +func TestIteratorBlockIntervalFilter(t *testing.T) { + var mem vfs.FS + var d *DB + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + opts := &Options{ + FS: mem, + FormatMajorVersion: FormatNewest, + BlockPropertyCollectors: []func() BlockPropertyCollector{ + func() BlockPropertyCollector { + return sstable.NewBlockIntervalCollector("test", 0, &testBlockIntervalCollector{}) + }, + }, + } + lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1} + opts.Levels = append(opts.Levels, lo) + + // Automatic compactions may compact away tombstones from L6, making + // some testcases non-deterministic. + opts.private.disableAutomaticCompactions = true + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + reset() + + datadriven.RunTest( + t, "testdata/iterator_block_interval_filter", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + + case "build": + b := d.NewBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + if err := d.Flush(); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + + case "iter": + var opts IterOptions + if len(td.CmdArgs) == 2 { + var lower, upper uint64 + td.ScanArgs(t, "lower", &lower) + td.ScanArgs(t, "upper", &upper) + opts.BlockPropertyFilters = []BlockPropertyFilter{ + sstable.NewBlockIntervalFilter("test", 0, lower, upper), + } + } + iter := d.NewIter(&opts) + return runIterCmd(td, iter, true) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + func BenchmarkIteratorSeekGE(b *testing.B) { m, keys := buildMemTable(b) iter := &Iterator{ diff --git a/level_iter.go b/level_iter.go index 35e8814ab96..04253b23054 100644 --- a/level_iter.go +++ b/level_iter.go @@ -185,6 +185,7 @@ func (l *levelIter) init( l.lower = opts.LowerBound l.upper = opts.UpperBound l.tableOpts.TableFilter = opts.TableFilter + l.tableOpts.BlockPropertyFilters = opts.BlockPropertyFilters l.cmp = cmp l.split = split l.iterFile = nil diff --git a/open_test.go b/open_test.go index eff8490b8b7..2949b225725 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000003.004", + "marker.format-version.000004.005", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/options.go b/options.go index 1c66f61e2d5..383a4962de0 100644 --- a/options.go +++ b/options.go @@ -52,6 +52,12 @@ type FilterPolicy = base.FilterPolicy // TablePropertyCollector exports the sstable.TablePropertyCollector type. type TablePropertyCollector = sstable.TablePropertyCollector +// BlockPropertyCollector exports the sstable.BlockPropertyCollector type. +type BlockPropertyCollector = sstable.BlockPropertyCollector + +// BlockPropertyFilter exports the sstable.BlockPropertyFilter type. +type BlockPropertyFilter = sstable.BlockPropertyFilter + // IterOptions hold the optional per-query parameters for NewIter. // // Like Options, a nil *IterOptions is valid and means to use the default @@ -72,7 +78,12 @@ type IterOptions struct { // false to skip scanning. This function must be thread-safe since the same // function can be used by multiple iterators, if the iterator is cloned. TableFilter func(userProps map[string]string) bool - + // BlockPropertyFilters can be used to avoid scanning tables and blocks in + // tables. It is requires that this slice is sorted in increasing order of + // the BlockPropertyFilter.ShortID. This slice represents an intersection + // across all filters, i.e., all filters must indicate that the block is + // relevant. + BlockPropertyFilters []BlockPropertyFilter // Internal options. logger Logger } @@ -491,6 +502,11 @@ type Options struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // WALBytesPerSync sets the number of bytes to write to a WAL before calling // Sync on it in the background. Just like with BytesPerSync above, this // helps smooth out disk write latencies, and avoids cases where the OS @@ -1157,6 +1173,7 @@ func (o *Options) MakeWriterOptions(level int) sstable.WriterOptions { } writerOpts.TableFormat = sstable.TableFormatRocksDBv2 writerOpts.TablePropertyCollectors = o.TablePropertyCollectors + writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors } levelOpts := o.Level(level) writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval diff --git a/sstable/block_property.go b/sstable/block_property.go new file mode 100644 index 00000000000..5fa42aae3aa --- /dev/null +++ b/sstable/block_property.go @@ -0,0 +1,472 @@ +// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/cockroachdb/errors" + "math" +) + +// Block properties are an optional user-facing feature that can be used to +// filter data blocks (and whole sstables) from an Iterator before they are +// loaded. They do not apply to range delete blocks. These are expected to +// very concisely represent a set of some attribute value contained within the +// key or value, such that the set includes all the attribute values in the +// block. This has some similarities with OLAP pruning approaches that +// maintain min-max attribute values for some column (which concisely +// represent a set), that is then used to prune at query time. In our case, +// data blocks are small, typically 50-100KB, so these properties should +// reduce their precision in order to be concise -- a good rule of thumb is to +// not consume more than 50-100 bytes across all properties maintained for a +// block, i.e., a 1000x reduction compared to loading the data block. +// +// A block property is assigned two unique identifiers: +// - A name, that is encoded and stored once in the sstable. This name must +// be unique among all user-properties encoded in an sstable. +// - A small integer index, that is encoded and stored once per data block and +// lower level index block. +// The maintenance of both identifiers is the responsibility of the DB user. +// +// A property is represented as a []byte. A nil value or empty byte slice are +// considered semantically identical. The caller is free to choose the +// semantics of an empty byte slice e.g. they could use it to represent the +// empty set or the universal set, whichever they think is more common and +// therefore better to encode more concisely. The serialization of the +// property for the various Finish*() calls in a BlockPropertyCollector +// implementation should be identical, since the corresponding +// BlockPropertyFilter implementation is not told the context in which it is +// deserializing the property. +// +// Block properties are more general than table properties and should be +// preferred over using table properties. A BlockPropertyCollector can achieve +// identical behavior to table properties by returning the nil slice from +// FinishDataBlock and FinishIndexBlock, and interpret them as the universal +// set in BlockPropertyFilter, and return a non-universal set in FinishTable. + +// BlockPropertyCollector is used when writing a sstable. +// - All calls to Add are included in the next FinishDataBlock, after which +// the next data block is expected to start. +// +// - The index entry generated for the data block, which contains the return +// value from FinishDataBlock, is not immediately included in the current +// index block. It is included when AddLastDataBlockToIndexBlock. An +// alternative would be to return an opaque handle from FinishDataBlock and +// pass it to a new AddToIndexBlock method, which requires more plumbing, +// and passing of an interface{} results in a undesirable heap allocation. +// AddLastDataBlockToIndexBlock must be called before keys are added to the +// new data block. +type BlockPropertyCollector interface { + // Name returns the name of the block property collector. + Name() string + // ShortID returns the integer identifier. + ShortID() uint16 + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the property value for the finished block. + FinishDataBlock(buf []byte) ([]byte, error) + // AddLastDataBlockToIndexBlock adds the entry corresponding to the last + // FinishDataBlock to the current index block. + AddLastDataBlockToIndexBlock() + // FinishIndexBlock is called when an index block, containing all the + // key-value pairs since the last FinishIndexBlock, will no longer see new + // entries. It returns the property value for the index block. + FinishIndexBlock(buf []byte) ([]byte, error) + // FinishTable is called when the sstable is finished, and returns the + // property value for the sstable. + FinishTable(buf []byte) ([]byte, error) +} + +// BlockPropertyFilter is used in an Iterator to filter sstables and blocks +// within the sstable. It should not maintain any per-sstable state, and must +// be thread-safe. +type BlockPropertyFilter interface { + // Name returns the name of the block property collector. + Name() string + // ShortID returns the integer identifier. + ShortID() uint16 + // Intersects returns true if the set represented by prop intersects with + // the set in the filter. + Intersects(prop []byte) (bool, error) +} + +// BlockIntervalCollector is a helper implementation of BlockPropertyCollector +// for users who want to represent a set of the form [lower,upper) where both +// lower and upper are uint64, and lower <= upper. +// +// The set is encoded as: +// - Two varint integers, (lower,upper-lower), when upper-lower > 0 +// - Nil, when upper-lower=0 +type BlockIntervalCollector struct { + name string + shortID uint16 + blockAttributeCollector DataBlockIntervalCollector + + blockInterval interval + indexInterval interval + tableInterval interval +} + +var _ BlockPropertyCollector = &BlockIntervalCollector{} + +// DataBlockIntervalCollector is the interface used by BlockIntervalCollector +// that contains the actual logic pertaining to the property. It only +// maintains state for the current data block, and resets that state in +// FinishDataBlock. This interface can be used to reduce parsing costs. +type DataBlockIntervalCollector interface { + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the [lower, upper) for the finished block. + FinishDataBlock() (lower uint64, upper uint64, err error) +} + +// NewBlockIntervalCollector constructs a BlockIntervalCollector, with the +// given name, shortID, and data block collector. +func NewBlockIntervalCollector( + name string, shortID uint16, blockAttributeCollector DataBlockIntervalCollector) *BlockIntervalCollector { + return &BlockIntervalCollector{ + name: name, shortID: shortID, blockAttributeCollector: blockAttributeCollector} +} + +// Name implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Name() string { + return b.name +} + +// ShortID implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) ShortID() uint16 { + return b.shortID +} + +// Add implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Add(key InternalKey, value []byte) error { + return b.blockAttributeCollector.Add(key, value) +} + +// FinishDataBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishDataBlock(buf []byte) ([]byte, error) { + var err error + b.blockInterval.lower, b.blockInterval.upper, err = b.blockAttributeCollector.FinishDataBlock() + if err != nil { + return buf, err + } + buf = b.blockInterval.encode(buf) + b.tableInterval.union(b.blockInterval) + return buf, nil +} + +// AddLastDataBlockToIndexBlock implements the BlockPropertyCollector +// interface. +func (b *BlockIntervalCollector) AddLastDataBlockToIndexBlock() { + b.indexInterval.union(b.blockInterval) + b.blockInterval = interval{} +} + +// FinishIndexBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishIndexBlock(buf []byte) ([]byte, error) { + buf = b.indexInterval.encode(buf) + b.indexInterval = interval{} + return buf, nil +} + +// FinishTable implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishTable(buf []byte) ([]byte, error) { + return b.tableInterval.encode(buf), nil +} + +type interval struct { + lower uint64 + upper uint64 +} + +func (i interval) encode(buf []byte) []byte { + if i.lower < i.upper { + var encoded [binary.MaxVarintLen64*2]byte + n := binary.PutUvarint(encoded[:], i.lower) + n += binary.PutUvarint(encoded[n:], i.upper-i.lower) + buf = append(buf, encoded[:n]...) + } + return buf +} + +func (i *interval) decode(buf []byte) error { + if len(buf) == 0 { + *i = interval{} + return nil + } + var n int + i.lower, n = binary.Uvarint(buf) + if n <= 0 || n >= len(buf) { + return errors.Errorf("cannot decode interval from buf %x", buf) + } + pos := n + i.upper, n = binary.Uvarint(buf[pos:]) + pos += n + if pos != len(buf) || n <= 0 { + return errors.Errorf("cannot decode interval from buf %x", buf) + } + // Delta decode. + i.upper += i.lower + if i.upper < i.lower { + return errors.Errorf("unexpected overflow, upper %d < lower %d", i.upper, i.lower) + } + return nil +} + +func (i *interval) union(x interval) { + if x.lower == x.upper { + // x is the empty set. + return + } + if i.lower == i.upper { + // i is the empty set. + *i = x + return + } + // Both sets are non-empty. + if x.lower < i.lower { + i.lower = x.lower + } + if x.upper > i.upper { + i.upper = x.upper + } +} + +func (i interval) intersects(x interval) bool { + if i.lower == i.upper || x.lower == x.upper { + // At least one of the sets is empty. + return false + } + // Neither set is empty. + return i.upper > x.lower && i.lower < x.upper +} + +// BlockIntervalFilter is an implementation of BlockPropertyFilter when the +// corresponding collector is a BlockIntervalCollector. That is, the set is of +// the form [lower, upper). +type BlockIntervalFilter struct { + name string + shortID uint16 + filterInterval interval +} + +// NewBlockIntervalFilter constructs a BlockIntervalFilter with the given +// name, shortID, and [lower, upper) bounds. +func NewBlockIntervalFilter( + name string, shortID uint16, lower uint64, upper uint64) *BlockIntervalFilter { + return &BlockIntervalFilter{ + name: name, + shortID: shortID, + filterInterval: interval{lower: lower, upper: upper}, + } +} + +// Name implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Name() string { + return b.name +} + +// ShortID implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) ShortID() uint16 { + return b.shortID +} + +// Intersects implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Intersects(prop []byte) (bool, error) { + var i interval + if err := i.decode(prop); err != nil { + return false, err + } + return i.intersects(b.filterInterval), nil +} + +type blockPropertiesEncoder struct { + propsBuf []byte + scratch []byte +} + +func (e *blockPropertiesEncoder) getScratchForProp() []byte { + return e.scratch[:0] +} + +func (e *blockPropertiesEncoder) resetProps() { + e.propsBuf = e.propsBuf[:0] +} + +func (e *blockPropertiesEncoder) addProp(id uint16, scratch []byte) { + lenID := uvarintLen(uint32(id)) + lenProp := uvarintLen(uint32(len(scratch))) + n := lenID + lenProp + len(scratch) + if cap(e.propsBuf) - len(e.propsBuf) < n { + size := len(e.propsBuf) + 2*n + if size < 2*cap(e.propsBuf) { + size = 2*cap(e.propsBuf) + } + buf := make([]byte, len(e.propsBuf), size) + copy(buf, e.propsBuf) + e.propsBuf = buf + } + pos := len(e.propsBuf) + b := e.propsBuf[pos:pos+lenID] + n = binary.PutUvarint(b, uint64(id)) + if n != lenID { + panic(fmt.Sprintf("unexpected length %d is not equal to %d", n, lenID)) + } + pos += n + b = e.propsBuf[pos:pos+lenProp] + n = binary.PutUvarint(b, uint64(len(scratch))) + pos += n + b = e.propsBuf[pos:pos+len(scratch)] + pos += len(scratch) + copy(b, scratch) + e.propsBuf = e.propsBuf[0:pos] + e.scratch = scratch +} + +func (e *blockPropertiesEncoder) unsafeProps() []byte { + return e.propsBuf +} + +func (e *blockPropertiesEncoder) props() []byte { + buf := make([]byte, len(e.propsBuf)) + copy(buf, e.propsBuf) + return buf +} + +func (e *blockPropertiesEncoder) reset() { + e.propsBuf = e.propsBuf[:] +} + +// Must be sorted in increasing order of shortID(). +type blockPropertiesFilterer []BlockPropertyFilter + +func (f blockPropertiesFilterer) intersects(props []byte) (bool, error) { + i := 0 + for len(props) > 0 && i < len(f) { + // Decode. + id, n := binary.Uvarint(props) + if n <= 0 || id > math.MaxUint16 { + return false, errors.Errorf("corrupt block property filter id") + } + shortID := uint16(id) + propLen, m := binary.Uvarint(props[n:]) + if m <= 0 || propLen == 0 || (n + m + int(propLen)) > len(props) { + return false, errors.Errorf("corrupt block property length") + } + n += m + prop := props[n:n+int(propLen)] + props = props[n+int(propLen):] + for i < len(f) && shortID > f[i].ShortID() { + // Not encoded for this block. + intersects, err := f[i].Intersects(nil) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + if i >= len(f) { + return true, nil + } + if shortID == f[i].ShortID() { + intersects, err := f[i].Intersects(prop) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + } + for i < len(f) { + // Not encoded for this block. + intersects, err := f[i].Intersects(nil) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + return true, nil +} + +// Following will move to the CockroachDB code. It is here only for +// illustration. + +type crdbDataBlockTimestampCollector struct { + // Keep the encoded timestamps in min, max and decode in FinishDataBlock. + min, max []byte +} + +var _ DataBlockIntervalCollector = &crdbDataBlockTimestampCollector{} + +const engineKeyVersionWallTimeLen = 8 +const engineKeyVersionWallAndLogicalTimeLen = 12 +const engineKeyVersionWallLogicalAndSyntheticTimeLen = 13 + +func (tc *crdbDataBlockTimestampCollector) Add(key InternalKey, value []byte) error { + k := key.UserKey + if len(k) == 0 { + return nil + } + // Last byte is the version length + 1 when there is a version, + // else it is 0. + versionLen := int(k[len(k)-1]) + // keyPartEnd points to the sentinel byte. + keyPartEnd := len(k) - 1 - versionLen + if keyPartEnd < 0 { + return errors.Errorf("invalid key") + } + if versionLen > 0 && (versionLen == engineKeyVersionWallTimeLen || + versionLen == engineKeyVersionWallAndLogicalTimeLen || + versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen) { + // Version consists of the bytes after the sentinel and before the length. + k = k[keyPartEnd+1 : len(k)-1] + if len(tc.min) == 0 || bytes.Compare(k, tc.min) < 0 { + tc.min = append(tc.min[:0], k...) + } + if len(tc.max) == 0 || bytes.Compare(k, tc.max) > 0 { + tc.max = append(tc.max[:0], k...) + } + } + return nil +} + +func decodeWallTime(ts []byte) uint64 { + return binary.BigEndian.Uint64(ts[0:8]) +} + +func (tc *crdbDataBlockTimestampCollector) FinishDataBlock( + ) (lower uint64, upper uint64, err error) { + if len(tc.min) == 0 { + // No calls to Add that contained a timestamped key. + return 0, 0, nil + } + lower = decodeWallTime(tc.min) + tc.min = tc.min[:0] + // The actual value encoded into walltime is an int64, so +1 will not + // overflow. + upper = decodeWallTime(tc.max) + 1 + tc.max = tc.max[:0] + if lower >= upper { + return 0, 0, + errors.Errorf("corrupt timestamps lower %d >= upper %d", lower, upper) + } + return lower, upper, nil +} + diff --git a/sstable/filter.go b/sstable/filter.go index f21303af955..f8c6d0dff61 100644 --- a/sstable/filter.go +++ b/sstable/filter.go @@ -4,7 +4,9 @@ package sstable -import "sync/atomic" +import ( + "sync/atomic" +) // FilterMetrics holds metrics for the filter policy. type FilterMetrics struct { @@ -31,6 +33,13 @@ type BlockHandle struct { Offset, Length uint64 } +// BlockHandleWithProperties is used for data blocks and first/lower level +// index blocks, since they can be annotated using BlockPropertyCollectors. +type BlockHandleWithProperties struct { + BlockHandle + Props []byte +} + type filterWriter interface { addKey(key []byte) finish() ([]byte, error) diff --git a/sstable/options.go b/sstable/options.go index 797331b961f..70d23c346a4 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -211,6 +211,11 @@ type WriterOptions struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // Checksum specifies which checksum to use. Checksum ChecksumType } diff --git a/sstable/reader.go b/sstable/reader.go index fb260cc3f3d..8e2ed9344f4 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -41,7 +41,11 @@ const ( // decodeBlockHandle returns the block handle encoded at the start of src, as // well as the number of bytes it occupies. It returns zero if given invalid -// input. +// input. A block handle for a data block or a first/lower level index block +// should not be decoded using decodeBlockHandle since the caller may validate +// that the number of bytes decoded is equal to the length of src, which will +// be false if the properties are not decoded. In those cases the caller +// should use decodeBlockHandleWithProperties. func decodeBlockHandle(src []byte) (BlockHandle, int) { offset, n := binary.Uvarint(src) length, m := binary.Uvarint(src[n:]) @@ -51,12 +55,33 @@ func decodeBlockHandle(src []byte) (BlockHandle, int) { return BlockHandle{offset, length}, n + m } +// decodeBlockHandleWithProperties returns the block handle and properties +// encoded in src. src needs to be exactly the length that was encoded. This +// method must be used for data block and first/lower level index blocks. The +// properties in the block handle point to the bytes in src. +func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) { + bh, n := decodeBlockHandle(src) + if n == 0 { + return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle") + } + return BlockHandleWithProperties{ + BlockHandle: bh, + Props: src[n:], + }, nil +} + func encodeBlockHandle(dst []byte, b BlockHandle) int { n := binary.PutUvarint(dst, b.Offset) m := binary.PutUvarint(dst[n:], b.Length) return n + m } +func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte { + n := encodeBlockHandle(dst, b.BlockHandle) + dst = append(dst[:n], b.Props...) + return dst +} + // block is a []byte that holds a sequence of key/value pairs plus an index // over those pairs. type block []byte @@ -76,6 +101,9 @@ type singleLevelIterator struct { // Global lower/upper bound for the iterator. lower []byte upper []byte + // The block property filters do not change in the lifetime of the + // singleLevelIterator or twoLevelIterator. + bpfs []BlockPropertyFilter // Per-block lower/upper bound. Nil if the bound does not apply to the block // because we determined the block lies completely within the bound. blockLower []byte @@ -145,7 +173,23 @@ type singleLevelIterator struct { // not need to do anything. // // Similar examples can be constructed for backward iteration. - + // + // This notion of exactly one key before or after the bounds is not quite + // true when block properties are used to ignore blocks. In that case we + // can't stop precisely at the first block that is past the bounds since + // we are using the index entries to enforce the bounds. + // + // e.g. 3 blocks with keys [b, c] [f, g], [i, j, k] with index entries d, + // h, l. And let the lower bound be k. If the block [i, j, k] is ignored + // due to the block interval annotations we do need to move the index to + // block [f, g] since the index entry for the [i, j, k] block is l which + // is not less than the lower bound of k. So we have passed the entries i, + // j. + // + // This is harmless since the block property filters are fixed for the + // lifetime of the iterator so i, j are irrelevant. In addition, the + // current code will not load the [f, g] block, so the seek optimization + // that attempts to use Next/Prev do not apply anyway. boundsCmp int positionedUsingLatestBounds bool @@ -206,7 +250,8 @@ func checkTwoLevelIterator(obj interface{}) { // init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. -func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *singleLevelIterator) init( + r *Reader, lower, upper []byte, bpfs []BlockPropertyFilter) error { if r.err != nil { return r.err } @@ -217,6 +262,7 @@ func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = bpfs i.reader = r i.cmp = r.Compare err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum) @@ -271,37 +317,55 @@ func (i *singleLevelIterator) initBounds() { } } +type loadBlockResult int8 +const ( + loadBlockOK loadBlockResult = iota + // Could be due to error or because no block left to load. + loadBlockFailed + loadBlockIrrelevant +) + // loadBlock loads the block at the current index position and leaves i.data // unpositioned. If unsuccessful, it sets i.err to any error encountered, which // may be nil if we have simply exhausted the entire table. -func (i *singleLevelIterator) loadBlock() bool { +func (i *singleLevelIterator) loadBlock() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the block // fails. i.data.invalidate() if !i.index.Valid() { - return false + return loadBlockFailed } // Load the next block. v := i.index.Value() - var n int - i.dataBH, n = decodeBlockHandle(v) - if n == 0 || n != len(v) { + bhp, err := decodeBlockHandleWithProperties(v) + i.dataBH = bhp.BlockHandle + if err != nil { i.err = errCorruptIndexEntry - return false + return loadBlockFailed + } + if len(i.bpfs) > 0 { + intersects, err := blockPropertiesFilterer(i.bpfs).intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } } block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataRS) if err != nil { i.err = err - return false + return loadBlockFailed } i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() - return false + return loadBlockFailed } i.initBounds() - return true + return loadBlockOK } func (i *singleLevelIterator) initBoundsForAlreadyLoadedBlock() { @@ -456,16 +520,31 @@ func (i *singleLevelIterator) seekGEHelper( } } // Slow-path. - if ikey, _ := i.index.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { // The target key is greater than any key in the sstable. Invalidate the // block iterator so that a subsequent call to Prev() will return the last // key in the table. i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next block if upper bound is already exceeded. Note that + // the next block starts with keys >= ikey.UserKey since even + // though this is the block separator, the same user key can span + // multiple blocks. Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + // Want to skip to the next block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekGE(key); ikey != nil { @@ -520,6 +599,9 @@ func (i *singleLevelIterator) seekPrefixGE( } i.lastBloomFilterMatched = true } + // The i.exhaustedBounds comparison indicates that the upper bound was + // reached. The i.data.isDataInvalidated() indicates that the sstable was + // exhausted. if trySeekUsingNext && (i.exhaustedBounds == +1 || i.data.isDataInvalidated()) { // Already exhausted, so return nil. return nil, nil @@ -568,12 +650,31 @@ func (i *singleLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { dontSeekWithinBlock = true } } else { - if ikey, _ := i.index.SeekGE(key); ikey == nil { - i.index.Last() + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { + ikey, _ = i.index.Last() + if ikey == nil { + return nil, nil + } } - if !i.loadBlock() { + // INVARIANT: ikey != nil. + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving + // to the previous block if lower bound is already exceeded. Note + // that the previous block starts with keys <= ikey.UserKey since + // even though this is the current block's separator, the same + // user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + // Want to skip to the previous block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekLT(key); ikey != nil { @@ -620,20 +721,38 @@ func (i *singleLevelIterator) firstInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.First(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.First(); ikey != nil { - if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + if result == loadBlockOK { + if ikey, val := i.data.First(); ikey != nil { + if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next block if upper bound is + // already exceeded. Note that the next block starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple blocks. Since upper is exclusive we + // use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { i.exhaustedBounds = +1 return nil, nil } - return ikey, val + // Else fall through to skipForward. } + return i.skipForward() } @@ -659,20 +778,36 @@ func (i *singleLevelIterator) lastInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.Last(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.Last(); ikey != nil { - if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + if result == loadBlockOK { + if ikey, val := i.data.Last(); ikey != nil { + if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here since + // don't want to bother moving to the previous block if lower bound is + // already exceeded. Note that the previous block starts with keys <= + // key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { i.exhaustedBounds = -1 return nil, nil } - return ikey, val } + return i.skipBackward() } @@ -726,14 +861,32 @@ func (i *singleLevelIterator) Prev() (*InternalKey, []byte) { func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { for { - if key, _ := i.index.Next(); key == nil { + var key *InternalKey + if key, _ = i.index.Next(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next block if upper + // bound is already exceeded. Note that the next block starts with + // keys >= key.UserKey since even though this is the block + // separator, the same user key can span multiple blocks. Since + // upper is exclusive we use >= below. + if i.upper != nil && i.cmp(key.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } continue } if key, val := i.data.First(); key != nil { @@ -749,14 +902,31 @@ func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { func (i *singleLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if key, _ := i.index.Prev(); key == nil { + var key *InternalKey + if key, _ = i.index.Prev(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous block if lower + // bound is already exceeded. Note that the previous block starts with + // keys <= key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(key.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } continue } key, val := i.data.Last() @@ -915,12 +1085,24 @@ func (i *compactionIterator) skipForward(key *InternalKey, val []byte) (*Interna if key, _ := i.index.Next(); key == nil { break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } - continue + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } } + // result == loadBlockOK if key, val = i.data.First(); key != nil { break } @@ -945,30 +1127,45 @@ var _ base.InternalIterator = (*twoLevelIterator)(nil) // leaves i.index unpositioned. If unsuccessful, it gets i.err to any error // encountered, which may be nil if we have simply exhausted the entire table. // This is used for two level indexes. -func (i *twoLevelIterator) loadIndex() bool { +func (i *twoLevelIterator) loadIndex() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the // index fails. i.data.invalidate() if !i.topLevelIndex.Valid() { i.index.offset = 0 i.index.restarts = 0 - return false + return loadBlockFailed } - h, n := decodeBlockHandle(i.topLevelIndex.Value()) - if n == 0 || n != len(i.topLevelIndex.Value()) { + bhp, err := decodeBlockHandleWithProperties(i.topLevelIndex.Value()) + if err != nil { i.err = base.CorruptionErrorf("pebble/table: corrupt top level index entry") - return false + return loadBlockFailed } - indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* readaheadState */) + if len(i.bpfs) > 0 { + intersects, err := blockPropertiesFilterer(i.bpfs).intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } + } + indexBlock, err := i.reader.readBlock( + bhp.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { i.err = err - return false + return loadBlockFailed } - i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) - return i.err == nil + if i.err = i.index.initHandle( + i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum); i.err == nil { + return loadBlockOK + } + return loadBlockFailed } -func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *twoLevelIterator) init( + r *Reader, lower, upper []byte, bpfs []BlockPropertyFilter) error { if r.err != nil { return r.err } @@ -979,6 +1176,7 @@ func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = bpfs i.reader = r i.cmp = r.Compare err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum) @@ -1001,18 +1199,34 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { i.exhaustedBounds = 0 i.err = nil // clear cached iteration error + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1023,8 +1237,10 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { + return ikey, val + } } return i.skipForward() } @@ -1067,6 +1283,7 @@ func (i *twoLevelIterator) SeekPrefixGE( // Bloom filter matches. i.exhaustedBounds = 0 + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. @@ -1079,15 +1296,30 @@ func (i *twoLevelIterator) SeekPrefixGE( // block, and in that case we don't need to invalidate and reload the // singleLevelIterator state. trySeekUsingNext = false - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1098,20 +1330,16 @@ func (i *twoLevelIterator) SeekPrefixGE( // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.seekPrefixGE( - prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.seekPrefixGE( + prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { + return ikey, val + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } -func (i *twoLevelIterator) SeekPrefixGEWithExhaustionIndicator( - prefix, key []byte, trySeekUsingNext bool, -) (k *base.InternalKey, value []byte, iterExhaustedAndNotBloomFilterFail bool) { - k, value = i.SeekPrefixGE(prefix, key, trySeekUsingNext) - return k, value, false -} - // SeekLT implements internalIterator.SeekLT, as documented in the pebble // package. Note that SeekLT only checks the lower bound. It is up to the // caller to ensure that key is less than the upper bound. @@ -1121,31 +1349,58 @@ func (i *twoLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 + var result loadBlockResult + var ikey *InternalKey // NB: Unlike SeekGE, we don't have a fast-path here since we don't know // whether the topLevelIndex is positioned after the position that would // be returned by doing i.topLevelIndex.SeekGE(). To know this we would // need to know the index key preceding the current one. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result = i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - return i.singleLevelIterator.lastInternal() - } - - if !i.loadIndex() { - return nil, nil - } - - if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraints, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } else { + result = i.loadIndex() + if result == loadBlockFailed { + return nil, nil + } + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraint, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving to + // the previous entry in the top level index if lower bound is already + // exceeded. Note that the previous entry starts with keys <= + // ikey.UserKey since even though this is the current block's + // separator, the same user key can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1162,17 +1417,32 @@ func (i *twoLevelIterator) First() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.First(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.First(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.First(); ikey != nil { + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next entry in the top level + // index if upper bound is already exceeded. Note that the next entry + // starts with keys >= ikey.UserKey since even though this is the + // block separator, the same user key can span multiple index blocks. + // Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } @@ -1189,17 +1459,32 @@ func (i *twoLevelIterator) Last() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.Last(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.Last(); ikey != nil { + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1235,51 +1520,75 @@ func (i *twoLevelIterator) Prev() (*InternalKey, []byte) { func (i *twoLevelIterator) skipForward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the upper bound. - // + if i.err != nil || i.exhaustedBounds > 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Next(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Next(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = +1. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next entry in the top + // level index if upper bound is already exceeded. Note that the + // next entry starts with keys >= ikey.UserKey since even though + // this is the block separator, the same user key can span + // multiple index blocks. Since upper is exclusive we use >= + // below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + // Next iteration will return. + } } } } func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the lower bound. + if i.err != nil || i.exhaustedBounds < 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Prev(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Prev(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = -1. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + // Next iteration will return. + } } } } @@ -1370,10 +1679,26 @@ func (i *twoLevelCompactionIterator) skipForward( if key, _ := i.topLevelIndex.Next(); key == nil { break } - if i.loadIndex() { - if key, val = i.singleLevelIterator.First(); key != nil { + result := i.loadIndex() + if result != loadBlockOK { + if i.err != nil { break } + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } + } + // result == loadBlockOK + if key, val = i.singleLevelIterator.First(); key != nil { + break } } } @@ -1777,15 +2102,27 @@ func (r *Reader) get(key []byte) (value []byte, err error) { return newValue, nil } -// NewIter returns an iterator for the contents of the table. If an error -// occurs, NewIter cleans up after itself and returns a nil iterator. -func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { +// NewIterWithBlockPropertyFilters returns an iterator for the contents of the +// table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after +// itself and returns a nil iterator. +func (r *Reader) NewIterWithBlockPropertyFilters( + lower, upper []byte, bpfs []BlockPropertyFilter) (Iterator, error) { + // Check that sorted and no duplicates. + if len(bpfs) > 1 { + for i := range bpfs { + if i > 0 && bpfs[i-1].ShortID() >= bpfs[i].ShortID() { + return nil, errors.Errorf( + "BlockPropertyFilters %d, %d are not sorted", + bpfs[i-1].ShortID(), bpfs[i].ShortID()) + } + } + } // NB: pebble.tableCache wraps the returned iterator with one which performs // reference counting on the Reader, preventing the Reader from being closed // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, bpfs) if err != nil { return nil, err } @@ -1793,20 +2130,26 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, bpfs) if err != nil { return nil, err } return i, nil } +// NewIter returns an iterator for the contents of the table. If an error +// occurs, NewIter cleans up after itself and returns a nil iterator. +func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { + return r.NewIterWithBlockPropertyFilters(lower, upper, nil) +} + // NewCompactionIter returns an iterator similar to NewIter but it also increments // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up // after itself and returns a nil iterator. func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -1817,7 +2160,7 @@ func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { }, nil } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -2116,33 +2459,34 @@ func (r *Reader) Layout() (*Layout, error) { l.Index = append(l.Index, r.indexBH) iter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } } else { l.TopIndex = r.indexBH topIter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := topIter.First(); key != nil; key, value = topIter.Next() { - indexBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + indexBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Index = append(l.Index, indexBH) + l.Index = append(l.Index, indexBH.BlockHandle) - subIndex, err := r.readBlock(indexBH, nil /* transform */, nil /* readaheadState */) + subIndex, err := r.readBlock( + indexBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return nil, err } iter, _ := newBlockIter(r.Compare, subIndex.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } subIndex.Release() } @@ -2237,11 +2581,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, topIter.Error() } - startIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* readaheadState */) + startIdxBlock, err := r.readBlock( + startIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2257,11 +2602,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } } else { - endIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* readaheadState */) + endIdxBlock, err := r.readBlock( + endIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2280,8 +2626,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, startIdxIter.Error() } - startBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } @@ -2297,8 +2643,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range spans beyond this file. Include data blocks through the last. return r.Properties.DataSize - startBH.Offset, nil } - endBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } return endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset, nil @@ -2532,8 +2878,8 @@ func (l *Layout) Describe( case "index", "top-index": iter, _ := newBlockIter(r.Compare, h.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - bh, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + bh, err := decodeBlockHandleWithProperties(value) + if err != nil { fmt.Fprintf(w, "%10d [err: %s]\n", b.Offset+uint64(iter.offset), err) continue } diff --git a/sstable/table.go b/sstable/table.go index ab6a89080d9..1545b4fd312 100644 --- a/sstable/table.go +++ b/sstable/table.go @@ -123,20 +123,29 @@ is a key that is >= every key in block i and is < every key i block i+1. The successor for the final block is a key that is >= every key in block N-1. The index block restart interval is 1: every entry is a restart point. -A block handle is an offset and a length; the length does not include the 5 -byte trailer. Both numbers are varint-encoded, with no padding between the two -values. The maximum size of an encoded block handle is therefore 20 bytes. + +A block handle is an offset, a length, and optional block properties (for data +blocks and first/lower level index blocks); the length does not include the 5 +byte trailer. All numbers are varint-encoded, with no padding between the two +values. The maximum size of an encoded block handle without properties is 20 +bytes. It is not advised to have properties that accumulate to be longer than +100 bytes. + */ const ( - blockTrailerLen = 5 - blockHandleMaxLen = 10 + 10 + blockTrailerLen = 5 + blockHandleMaxLenWithoutProperties = 10 + 10 + // blockHandleLikelyMaxLen can be used for pre-allocating buffers to + // reduce memory copies. It is not guaranteed that a block handle will not + // exceed this length. + blockHandleLikelyMaxLen = blockHandleMaxLenWithoutProperties + 100 levelDBFooterLen = 48 levelDBMagic = "\x57\xfb\x80\x8b\x24\x75\x47\xdb" levelDBMagicOffset = levelDBFooterLen - len(levelDBMagic) - rocksDBFooterLen = 1 + 2*blockHandleMaxLen + 4 + 8 + rocksDBFooterLen = 1 + 2*blockHandleMaxLenWithoutProperties + 4 + 8 rocksDBMagic = "\xf7\xcf\xf4\x85\xb7\x41\xe2\x88" rocksDBMagicOffset = rocksDBFooterLen - len(rocksDBMagic) rocksDBVersionOffset = rocksDBMagicOffset - 4 diff --git a/sstable/writer.go b/sstable/writer.go index d36401e3468..2a937f37f04 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "math" + "sort" "github.com/cespare/xxhash/v2" "github.com/cockroachdb/errors" @@ -128,12 +129,14 @@ type Writer struct { // Internal flag to allow creation of range-del-v1 format blocks. Only used // for testing. Note that v2 format blocks are backwards compatible with v1 // format blocks. - rangeDelV1Format bool - block blockWriter - indexBlock blockWriter - rangeDelBlock blockWriter - props Properties - propCollectors []TablePropertyCollector + rangeDelV1Format bool + block blockWriter + indexBlock blockWriter + rangeDelBlock blockWriter + props Properties + propCollectors []TablePropertyCollector + blockPropCollectors []BlockPropertyCollector + blockPropsEncoder blockPropertiesEncoder // compressedBuf is the destination buffer for compression. It is // re-used over the lifetime of the writer, avoiding the allocation of a // temporary buffer for each block. @@ -143,13 +146,19 @@ type Writer struct { // nil, or the full keys otherwise. filter filterWriter // tmp is a scratch buffer, large enough to hold either footerLen bytes, - // blockTrailerLen bytes, or (5 * binary.MaxVarintLen64) bytes. - tmp [rocksDBFooterLen]byte + // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most + // likely large enough for a block handle with properties. + tmp [blockHandleLikelyMaxLen]byte xxHasher *xxhash.Digest topLevelIndexBlock blockWriter - indexPartitions []blockWriter + indexPartitions []indexBlockWriterAndBlockProperties +} + +type indexBlockWriterAndBlockProperties struct { + writer blockWriter + properties []byte } // Set sets the value for the given key. The sequence number is set to @@ -241,6 +250,11 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { return err } } + for i := range w.blockPropCollectors { + if err := w.blockPropCollectors[i].Add(key, value); err != nil { + return err + } + } w.maybeAddToFilter(key.UserKey) w.block.add(key, value) @@ -376,16 +390,44 @@ func (w *Writer) maybeFlush(key InternalKey, value []byte) error { w.err = err return w.err } - w.addIndexEntry(key, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(key, bhp); err != nil { + return err + } return nil } +// The BlockHandleWithProperties returned by this method must be encoded +// before any future use of the Writer.blockPropsEncoder, since the properties +// slice will get reused by the blockPropsEncoder. +func (w *Writer) maybeAddBlockPropertiesToBlockHandle( + bh BlockHandle) (BlockHandleWithProperties, error) { + if len(w.blockPropCollectors) == 0 { + return BlockHandleWithProperties{BlockHandle: bh}, nil + } + var err error + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil { + return BlockHandleWithProperties{}, nil + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(w.blockPropCollectors[i].ShortID(), scratch) + } + } + return BlockHandleWithProperties{BlockHandle: bh, Props: w.blockPropsEncoder.unsafeProps()}, nil +} + // addIndexEntry adds an index entry for the specified key and block handle. -func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { - if bh.Length == 0 { +func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) error { + if bhp.Length == 0 { // A valid blockHandle must be non-zero. // In particular, it must have a non-zero length. - return + return nil } prevKey := base.DecodeInternalKey(w.block.curKey) var sep InternalKey @@ -394,16 +436,22 @@ func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { } else { sep = prevKey.Separator(w.compare, w.separator, nil, key) } - n := encodeBlockHandle(w.tmp[:], bh) + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) if supportsTwoLevelIndex(w.tableFormat) && - shouldFlush(sep, w.tmp[:n], &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { + shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { // Enable two level indexes if there is more than one index block. w.twoLevelIndex = true - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return err + } } - w.indexBlock.add(sep, w.tmp[:n]) + for i := range w.blockPropCollectors { + w.blockPropCollectors[i].AddLastDataBlockToIndexBlock() + } + w.indexBlock.add(sep, encoded) + return nil } func shouldFlush( @@ -438,29 +486,51 @@ func shouldFlush( // finishIndexBlock finishes the current index block and adds it to the top // level index block. This is only used when two level indexes are enabled. -func (w *Writer) finishIndexBlock() { - w.indexPartitions = append(w.indexPartitions, w.indexBlock) +func (w *Writer) finishIndexBlock() error { + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + var err error + if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil { + return err + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(w.blockPropCollectors[i].ShortID(), scratch) + } + } + w.indexPartitions = append(w.indexPartitions, + indexBlockWriterAndBlockProperties{ + writer: w.indexBlock, + properties: w.blockPropsEncoder.props(), + }) w.indexBlock = blockWriter{ restartInterval: 1, } + return nil } func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { // Add the final unfinished index. - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return BlockHandle{}, err + } for i := range w.indexPartitions { b := &w.indexPartitions[i] - w.props.NumDataBlocks += uint64(b.nEntries) - sep := base.DecodeInternalKey(b.curKey) - data := b.finish() + w.props.NumDataBlocks += uint64(b.writer.nEntries) + sep := base.DecodeInternalKey(b.writer.curKey) + data := b.writer.finish() w.props.IndexSize += uint64(len(data)) bh, err := w.writeBlock(data, w.compression) if err != nil { return BlockHandle{}, err } - n := encodeBlockHandle(w.tmp[:], bh) - w.topLevelIndexBlock.add(sep, w.tmp[:n]) + bhp := BlockHandleWithProperties{ + BlockHandle: bh, + Props: b.properties, + } + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) + w.topLevelIndexBlock.add(sep, encoded) } // NB: RocksDB includes the block trailer length in the index size @@ -506,7 +576,7 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) } binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) - bh := BlockHandle{w.meta.Size, uint64(len(b))} + bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(b))} if w.cacheID != 0 && w.fileNum != 0 { // Remove the block being written from the cache. This provides defense in @@ -557,7 +627,13 @@ func (w *Writer) Close() (err error) { w.err = err return w.err } - w.addIndexEntry(InternalKey{}, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(InternalKey{}, bhp); err != nil { + return err + } } w.props.DataSize = w.meta.Size @@ -638,6 +714,21 @@ func (w *Writer) Close() (err error) { return err } } + for i := range w.blockPropCollectors { + buf, err := + w.blockPropCollectors[i].FinishTable(w.blockPropsEncoder.getScratchForProp()) + if err != nil { + return err + } + var prop string + if len(buf) > 0 { + prop = string(buf) + } + // NB: The property is populated in the map even if it is the + // empty string, since the presence in the map is what indicates + // that the block property collector was used when writing. + userProps[w.blockPropCollectors[i].Name()] = prop + } if len(userProps) > 0 { w.props.UserProperties = userProps } @@ -823,16 +914,31 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.props.PropertyCollectorNames = "[]" w.props.ExternalFormatVersion = rocksDBExternalFormatVersion - if len(o.TablePropertyCollectors) > 0 { - w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + if len(o.TablePropertyCollectors) > 0 || len(o.BlockPropertyCollectors) > 0 { var buf bytes.Buffer buf.WriteString("[") - for i := range o.TablePropertyCollectors { - w.propCollectors[i] = o.TablePropertyCollectors[i]() - if i > 0 { - buf.WriteString(",") + if len(o.TablePropertyCollectors) > 0 { + w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + for i := range o.TablePropertyCollectors { + w.propCollectors[i] = o.TablePropertyCollectors[i]() + if i > 0 { + buf.WriteString(",") + } + buf.WriteString(w.propCollectors[i].Name()) + } + } + if len(o.BlockPropertyCollectors) > 0 { + w.blockPropCollectors = make([]BlockPropertyCollector, len(o.BlockPropertyCollectors)) + for i := range o.BlockPropertyCollectors { + w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]() + if i > 0 || len(o.TablePropertyCollectors) > 0 { + buf.WriteString(",") + } + buf.WriteString(w.blockPropCollectors[i].Name()) } - buf.WriteString(w.propCollectors[i].Name()) + sort.Slice(w.blockPropCollectors, func(i, j int) bool { + return w.blockPropCollectors[i].ShortID() < w.blockPropCollectors[j].ShortID() + }) } buf.WriteString("]") w.props.PropertyCollectorNames = buf.String() diff --git a/table_cache.go b/table_cache.go index a6427858f1a..7bc862a4927 100644 --- a/table_cache.go +++ b/table_cache.go @@ -318,13 +318,53 @@ func (c *tableCacheShard) newIters( c.unrefValue(v) return emptyIter, nil, nil } + var bpfs []BlockPropertyFilter + if opts != nil { + bpfs = opts.BlockPropertyFilters + } + if len(bpfs) > 0 { + intersects := false + someFiltersAbsent := false + for i := range bpfs { + props, ok := v.reader.Properties.UserProperties[bpfs[i].Name()] + if !ok { + intersects = true + someFiltersAbsent = true + } + filterIntersects, err := bpfs[i].Intersects([]byte(props)) + if err != nil { + return nil, nil, err + } + intersects = intersects || filterIntersects + } + if !intersects { + // Return the empty iterator. This iterator has no mutable state, so + // using a singleton is fine. + c.unrefValue(v) + return emptyIter, nil, nil + } + if someFiltersAbsent { + // The common case is exactly one filter, and if that is absent, + // we don't append to this slice. So we don't bother with trying + // to optimize memory allocations for this slice. + var customizedBpfs []BlockPropertyFilter + for i := range bpfs { + _, ok := v.reader.Properties.UserProperties[bpfs[i].Name()] + if ok { + customizedBpfs = append(customizedBpfs, bpfs[i]) + } + } + bpfs = customizedBpfs + } + } var iter sstable.Iterator var err error if bytesIterated != nil { iter, err = v.reader.NewCompactionIter(bytesIterated) } else { - iter, err = v.reader.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) + iter, err = v.reader.NewIterWithBlockPropertyFilters( + opts.GetLowerBound(), opts.GetUpperBound(), bpfs) } if err != nil { c.unrefValue(v) diff --git a/testdata/checkpoint b/testdata/checkpoint index 66662f14dbc..b75fddc3660 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -28,6 +28,9 @@ sync: db create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -93,9 +96,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.004 -sync: checkpoints/checkpoint1/marker.format-version.000001.004 -close: checkpoints/checkpoint1/marker.format-version.000001.004 +create: checkpoints/checkpoint1/marker.format-version.000001.005 +sync: checkpoints/checkpoint1/marker.format-version.000001.005 +close: checkpoints/checkpoint1/marker.format-version.000001.005 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -150,7 +153,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000003.004 +marker.format-version.000004.005 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -160,7 +163,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.004 +marker.format-version.000001.005 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/event_listener b/testdata/event_listener index 412ba1a19cb..94d0de7f5f2 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -34,6 +34,10 @@ create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db upgraded to format version: 004 +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db +upgraded to format version: 005 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -200,9 +204,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.004 -sync: checkpoint/marker.format-version.000001.004 -close: checkpoint/marker.format-version.000001.004 +create: checkpoint/marker.format-version.000001.005 +sync: checkpoint/marker.format-version.000001.005 +close: checkpoint/marker.format-version.000001.005 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/iterator_block_interval_filter b/testdata/iterator_block_interval_filter new file mode 100644 index 00000000000..763cf02d906 --- /dev/null +++ b/testdata/iterator_block_interval_filter @@ -0,0 +1,70 @@ +# Block size is 1, so each block contains one key, and two level index is used +# since even the lower index blocks have only one key. +build +set a01 a +set b02 b +set c03 c +set d04 d +set e05 e +set f06 f +---- +0.0: + 000005:[a01#1,SET-f06#6,SET] + +iter +first +next +next +next +next +next +next +---- +a01:a +b02:b +c03:c +d04:d +e05:e +f06:f +. + +iter lower=1 upper=2 +first +next +prev +prev +next +next +---- +a01:a +. +a01:a +. +a01:a +. + + +iter lower=3 upper=5 +first +next +next +last +prev +prev +seek-lt f +prev +next +prev +prev +---- +c03:c +d04:d +. +d04:d +c03:c +. +d04:d +c03:c +d04:d +c03:c +.