diff --git a/compaction.go b/compaction.go index 008b7a154cd..9cbdb86f82b 100644 --- a/compaction.go +++ b/compaction.go @@ -2032,7 +2032,7 @@ func (d *DB) runCompaction( }() snapshots := d.mu.snapshots.toSlice() - + formatVers := d.mu.formatVers.vers // Release the d.mu lock while doing I/O. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() @@ -2085,6 +2085,10 @@ func (d *DB) runCompaction( } writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level) + if formatVers < FormatBlockPropertyCollector { + // Cannot yet write block properties. + writerOpts.BlockPropertyCollectors = nil + } newOutput := func() error { fileMeta := &fileMetadata{} diff --git a/format_major_version.go b/format_major_version.go index 2d87cf24c5d..01ae650cbe1 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -66,8 +66,11 @@ const ( // kind, base.InternalKeyKindSetWithDelete. Previous Pebble versions will be // unable to open this database. FormatSetWithDelete + // FormatBlockPropertyCollector is a format major version that introduces + // BlockPropertyCollectors. + FormatBlockPropertyCollector // FormatNewest always contains the most recent format major version. - FormatNewest FormatMajorVersion = FormatSetWithDelete + FormatNewest FormatMajorVersion = FormatBlockPropertyCollector ) // formatMajorVersionMigrations defines the migrations from one format @@ -143,6 +146,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSetWithDelete: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSetWithDelete) }, + FormatBlockPropertyCollector: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 275bd600470..9293bbfd2f7 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -33,6 +33,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVersioned, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSetWithDelete)) require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector)) + require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion()) require.NoError(t, d.Close()) // If we Open the database again, leaving the default format, the diff --git a/iterator_test.go b/iterator_test.go index 62e1c004274..f1e6337cc79 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -959,6 +959,122 @@ func TestIteratorSeekOptErrors(t *testing.T) { }) } +type testBlockIntervalCollector struct { + initialized bool + lower, upper uint64 +} + +func (bi *testBlockIntervalCollector) Add(key InternalKey, value []byte) error { + k := key.UserKey + if len(k) < 2 { + return nil + } + val, err := strconv.Atoi(string(k[len(k)-2:])) + if err != nil { + return err + } + if val < 0 { + panic("testBlockIntervalCollector expects values >= 0") + } + uval := uint64(val) + if !bi.initialized { + bi.lower, bi.upper = uval, uval+1 + bi.initialized = true + return nil + } + if bi.lower > uval { + bi.lower = uval + } + if uval >= bi.upper { + bi.upper = uval+1 + } + return nil +} + +func (bi *testBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) { + // TODO: remove printf + fmt.Printf("tbia: %t [%d,%d)\n", bi.initialized, bi.lower, bi.upper) + bi.initialized = false + l, u := bi.lower, bi.upper + bi.lower, bi.upper = 0, 0 + return l, u, nil +} + +func TestIteratorBlockIntervalFilter(t *testing.T) { + var mem vfs.FS + var d *DB + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + opts := &Options{ + FS: mem, + FormatMajorVersion: FormatNewest, + BlockPropertyCollectors: []func() BlockPropertyCollector{ + func() BlockPropertyCollector { + return sstable.NewBlockIntervalCollector("test", 0, &testBlockIntervalCollector{}) + }, + }, + } + lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1} + opts.Levels = append(opts.Levels, lo) + + // Automatic compactions may compact away tombstones from L6, making + // some testcases non-deterministic. + opts.private.disableAutomaticCompactions = true + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + reset() + + datadriven.RunTest( + t, "testdata/iterator_block_interval_filter", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + + case "build": + b := d.NewBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + if err := d.Flush(); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + + case "iter": + var opts IterOptions + if len(td.CmdArgs) == 2 { + var lower, upper uint64 + td.ScanArgs(t, "lower", &lower) + td.ScanArgs(t, "upper", &upper) + opts.BlockPropertyFilters = []BlockPropertyFilter{ + sstable.NewBlockIntervalFilter("test", 0, lower, upper), + } + } + iter := d.NewIter(&opts) + return runIterCmd(td, iter, true) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + func BenchmarkIteratorSeekGE(b *testing.B) { m, keys := buildMemTable(b) iter := &Iterator{ diff --git a/level_iter.go b/level_iter.go index 35e8814ab96..04253b23054 100644 --- a/level_iter.go +++ b/level_iter.go @@ -185,6 +185,7 @@ func (l *levelIter) init( l.lower = opts.LowerBound l.upper = opts.UpperBound l.tableOpts.TableFilter = opts.TableFilter + l.tableOpts.BlockPropertyFilters = opts.BlockPropertyFilters l.cmp = cmp l.split = split l.iterFile = nil diff --git a/open_test.go b/open_test.go index eff8490b8b7..2949b225725 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000003.004", + "marker.format-version.000004.005", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/options.go b/options.go index 1c66f61e2d5..383a4962de0 100644 --- a/options.go +++ b/options.go @@ -52,6 +52,12 @@ type FilterPolicy = base.FilterPolicy // TablePropertyCollector exports the sstable.TablePropertyCollector type. type TablePropertyCollector = sstable.TablePropertyCollector +// BlockPropertyCollector exports the sstable.BlockPropertyCollector type. +type BlockPropertyCollector = sstable.BlockPropertyCollector + +// BlockPropertyFilter exports the sstable.BlockPropertyFilter type. +type BlockPropertyFilter = sstable.BlockPropertyFilter + // IterOptions hold the optional per-query parameters for NewIter. // // Like Options, a nil *IterOptions is valid and means to use the default @@ -72,7 +78,12 @@ type IterOptions struct { // false to skip scanning. This function must be thread-safe since the same // function can be used by multiple iterators, if the iterator is cloned. TableFilter func(userProps map[string]string) bool - + // BlockPropertyFilters can be used to avoid scanning tables and blocks in + // tables. It is requires that this slice is sorted in increasing order of + // the BlockPropertyFilter.ShortID. This slice represents an intersection + // across all filters, i.e., all filters must indicate that the block is + // relevant. + BlockPropertyFilters []BlockPropertyFilter // Internal options. logger Logger } @@ -491,6 +502,11 @@ type Options struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // WALBytesPerSync sets the number of bytes to write to a WAL before calling // Sync on it in the background. Just like with BytesPerSync above, this // helps smooth out disk write latencies, and avoids cases where the OS @@ -1157,6 +1173,7 @@ func (o *Options) MakeWriterOptions(level int) sstable.WriterOptions { } writerOpts.TableFormat = sstable.TableFormatRocksDBv2 writerOpts.TablePropertyCollectors = o.TablePropertyCollectors + writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors } levelOpts := o.Level(level) writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval diff --git a/sstable/block_property.go b/sstable/block_property.go new file mode 100644 index 00000000000..5fa42aae3aa --- /dev/null +++ b/sstable/block_property.go @@ -0,0 +1,472 @@ +// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/cockroachdb/errors" + "math" +) + +// Block properties are an optional user-facing feature that can be used to +// filter data blocks (and whole sstables) from an Iterator before they are +// loaded. They do not apply to range delete blocks. These are expected to +// very concisely represent a set of some attribute value contained within the +// key or value, such that the set includes all the attribute values in the +// block. This has some similarities with OLAP pruning approaches that +// maintain min-max attribute values for some column (which concisely +// represent a set), that is then used to prune at query time. In our case, +// data blocks are small, typically 50-100KB, so these properties should +// reduce their precision in order to be concise -- a good rule of thumb is to +// not consume more than 50-100 bytes across all properties maintained for a +// block, i.e., a 1000x reduction compared to loading the data block. +// +// A block property is assigned two unique identifiers: +// - A name, that is encoded and stored once in the sstable. This name must +// be unique among all user-properties encoded in an sstable. +// - A small integer index, that is encoded and stored once per data block and +// lower level index block. +// The maintenance of both identifiers is the responsibility of the DB user. +// +// A property is represented as a []byte. A nil value or empty byte slice are +// considered semantically identical. The caller is free to choose the +// semantics of an empty byte slice e.g. they could use it to represent the +// empty set or the universal set, whichever they think is more common and +// therefore better to encode more concisely. The serialization of the +// property for the various Finish*() calls in a BlockPropertyCollector +// implementation should be identical, since the corresponding +// BlockPropertyFilter implementation is not told the context in which it is +// deserializing the property. +// +// Block properties are more general than table properties and should be +// preferred over using table properties. A BlockPropertyCollector can achieve +// identical behavior to table properties by returning the nil slice from +// FinishDataBlock and FinishIndexBlock, and interpret them as the universal +// set in BlockPropertyFilter, and return a non-universal set in FinishTable. + +// BlockPropertyCollector is used when writing a sstable. +// - All calls to Add are included in the next FinishDataBlock, after which +// the next data block is expected to start. +// +// - The index entry generated for the data block, which contains the return +// value from FinishDataBlock, is not immediately included in the current +// index block. It is included when AddLastDataBlockToIndexBlock. An +// alternative would be to return an opaque handle from FinishDataBlock and +// pass it to a new AddToIndexBlock method, which requires more plumbing, +// and passing of an interface{} results in a undesirable heap allocation. +// AddLastDataBlockToIndexBlock must be called before keys are added to the +// new data block. +type BlockPropertyCollector interface { + // Name returns the name of the block property collector. + Name() string + // ShortID returns the integer identifier. + ShortID() uint16 + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the property value for the finished block. + FinishDataBlock(buf []byte) ([]byte, error) + // AddLastDataBlockToIndexBlock adds the entry corresponding to the last + // FinishDataBlock to the current index block. + AddLastDataBlockToIndexBlock() + // FinishIndexBlock is called when an index block, containing all the + // key-value pairs since the last FinishIndexBlock, will no longer see new + // entries. It returns the property value for the index block. + FinishIndexBlock(buf []byte) ([]byte, error) + // FinishTable is called when the sstable is finished, and returns the + // property value for the sstable. + FinishTable(buf []byte) ([]byte, error) +} + +// BlockPropertyFilter is used in an Iterator to filter sstables and blocks +// within the sstable. It should not maintain any per-sstable state, and must +// be thread-safe. +type BlockPropertyFilter interface { + // Name returns the name of the block property collector. + Name() string + // ShortID returns the integer identifier. + ShortID() uint16 + // Intersects returns true if the set represented by prop intersects with + // the set in the filter. + Intersects(prop []byte) (bool, error) +} + +// BlockIntervalCollector is a helper implementation of BlockPropertyCollector +// for users who want to represent a set of the form [lower,upper) where both +// lower and upper are uint64, and lower <= upper. +// +// The set is encoded as: +// - Two varint integers, (lower,upper-lower), when upper-lower > 0 +// - Nil, when upper-lower=0 +type BlockIntervalCollector struct { + name string + shortID uint16 + blockAttributeCollector DataBlockIntervalCollector + + blockInterval interval + indexInterval interval + tableInterval interval +} + +var _ BlockPropertyCollector = &BlockIntervalCollector{} + +// DataBlockIntervalCollector is the interface used by BlockIntervalCollector +// that contains the actual logic pertaining to the property. It only +// maintains state for the current data block, and resets that state in +// FinishDataBlock. This interface can be used to reduce parsing costs. +type DataBlockIntervalCollector interface { + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the [lower, upper) for the finished block. + FinishDataBlock() (lower uint64, upper uint64, err error) +} + +// NewBlockIntervalCollector constructs a BlockIntervalCollector, with the +// given name, shortID, and data block collector. +func NewBlockIntervalCollector( + name string, shortID uint16, blockAttributeCollector DataBlockIntervalCollector) *BlockIntervalCollector { + return &BlockIntervalCollector{ + name: name, shortID: shortID, blockAttributeCollector: blockAttributeCollector} +} + +// Name implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Name() string { + return b.name +} + +// ShortID implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) ShortID() uint16 { + return b.shortID +} + +// Add implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Add(key InternalKey, value []byte) error { + return b.blockAttributeCollector.Add(key, value) +} + +// FinishDataBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishDataBlock(buf []byte) ([]byte, error) { + var err error + b.blockInterval.lower, b.blockInterval.upper, err = b.blockAttributeCollector.FinishDataBlock() + if err != nil { + return buf, err + } + buf = b.blockInterval.encode(buf) + b.tableInterval.union(b.blockInterval) + return buf, nil +} + +// AddLastDataBlockToIndexBlock implements the BlockPropertyCollector +// interface. +func (b *BlockIntervalCollector) AddLastDataBlockToIndexBlock() { + b.indexInterval.union(b.blockInterval) + b.blockInterval = interval{} +} + +// FinishIndexBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishIndexBlock(buf []byte) ([]byte, error) { + buf = b.indexInterval.encode(buf) + b.indexInterval = interval{} + return buf, nil +} + +// FinishTable implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishTable(buf []byte) ([]byte, error) { + return b.tableInterval.encode(buf), nil +} + +type interval struct { + lower uint64 + upper uint64 +} + +func (i interval) encode(buf []byte) []byte { + if i.lower < i.upper { + var encoded [binary.MaxVarintLen64*2]byte + n := binary.PutUvarint(encoded[:], i.lower) + n += binary.PutUvarint(encoded[n:], i.upper-i.lower) + buf = append(buf, encoded[:n]...) + } + return buf +} + +func (i *interval) decode(buf []byte) error { + if len(buf) == 0 { + *i = interval{} + return nil + } + var n int + i.lower, n = binary.Uvarint(buf) + if n <= 0 || n >= len(buf) { + return errors.Errorf("cannot decode interval from buf %x", buf) + } + pos := n + i.upper, n = binary.Uvarint(buf[pos:]) + pos += n + if pos != len(buf) || n <= 0 { + return errors.Errorf("cannot decode interval from buf %x", buf) + } + // Delta decode. + i.upper += i.lower + if i.upper < i.lower { + return errors.Errorf("unexpected overflow, upper %d < lower %d", i.upper, i.lower) + } + return nil +} + +func (i *interval) union(x interval) { + if x.lower == x.upper { + // x is the empty set. + return + } + if i.lower == i.upper { + // i is the empty set. + *i = x + return + } + // Both sets are non-empty. + if x.lower < i.lower { + i.lower = x.lower + } + if x.upper > i.upper { + i.upper = x.upper + } +} + +func (i interval) intersects(x interval) bool { + if i.lower == i.upper || x.lower == x.upper { + // At least one of the sets is empty. + return false + } + // Neither set is empty. + return i.upper > x.lower && i.lower < x.upper +} + +// BlockIntervalFilter is an implementation of BlockPropertyFilter when the +// corresponding collector is a BlockIntervalCollector. That is, the set is of +// the form [lower, upper). +type BlockIntervalFilter struct { + name string + shortID uint16 + filterInterval interval +} + +// NewBlockIntervalFilter constructs a BlockIntervalFilter with the given +// name, shortID, and [lower, upper) bounds. +func NewBlockIntervalFilter( + name string, shortID uint16, lower uint64, upper uint64) *BlockIntervalFilter { + return &BlockIntervalFilter{ + name: name, + shortID: shortID, + filterInterval: interval{lower: lower, upper: upper}, + } +} + +// Name implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Name() string { + return b.name +} + +// ShortID implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) ShortID() uint16 { + return b.shortID +} + +// Intersects implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Intersects(prop []byte) (bool, error) { + var i interval + if err := i.decode(prop); err != nil { + return false, err + } + return i.intersects(b.filterInterval), nil +} + +type blockPropertiesEncoder struct { + propsBuf []byte + scratch []byte +} + +func (e *blockPropertiesEncoder) getScratchForProp() []byte { + return e.scratch[:0] +} + +func (e *blockPropertiesEncoder) resetProps() { + e.propsBuf = e.propsBuf[:0] +} + +func (e *blockPropertiesEncoder) addProp(id uint16, scratch []byte) { + lenID := uvarintLen(uint32(id)) + lenProp := uvarintLen(uint32(len(scratch))) + n := lenID + lenProp + len(scratch) + if cap(e.propsBuf) - len(e.propsBuf) < n { + size := len(e.propsBuf) + 2*n + if size < 2*cap(e.propsBuf) { + size = 2*cap(e.propsBuf) + } + buf := make([]byte, len(e.propsBuf), size) + copy(buf, e.propsBuf) + e.propsBuf = buf + } + pos := len(e.propsBuf) + b := e.propsBuf[pos:pos+lenID] + n = binary.PutUvarint(b, uint64(id)) + if n != lenID { + panic(fmt.Sprintf("unexpected length %d is not equal to %d", n, lenID)) + } + pos += n + b = e.propsBuf[pos:pos+lenProp] + n = binary.PutUvarint(b, uint64(len(scratch))) + pos += n + b = e.propsBuf[pos:pos+len(scratch)] + pos += len(scratch) + copy(b, scratch) + e.propsBuf = e.propsBuf[0:pos] + e.scratch = scratch +} + +func (e *blockPropertiesEncoder) unsafeProps() []byte { + return e.propsBuf +} + +func (e *blockPropertiesEncoder) props() []byte { + buf := make([]byte, len(e.propsBuf)) + copy(buf, e.propsBuf) + return buf +} + +func (e *blockPropertiesEncoder) reset() { + e.propsBuf = e.propsBuf[:] +} + +// Must be sorted in increasing order of shortID(). +type blockPropertiesFilterer []BlockPropertyFilter + +func (f blockPropertiesFilterer) intersects(props []byte) (bool, error) { + i := 0 + for len(props) > 0 && i < len(f) { + // Decode. + id, n := binary.Uvarint(props) + if n <= 0 || id > math.MaxUint16 { + return false, errors.Errorf("corrupt block property filter id") + } + shortID := uint16(id) + propLen, m := binary.Uvarint(props[n:]) + if m <= 0 || propLen == 0 || (n + m + int(propLen)) > len(props) { + return false, errors.Errorf("corrupt block property length") + } + n += m + prop := props[n:n+int(propLen)] + props = props[n+int(propLen):] + for i < len(f) && shortID > f[i].ShortID() { + // Not encoded for this block. + intersects, err := f[i].Intersects(nil) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + if i >= len(f) { + return true, nil + } + if shortID == f[i].ShortID() { + intersects, err := f[i].Intersects(prop) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + } + for i < len(f) { + // Not encoded for this block. + intersects, err := f[i].Intersects(nil) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + i++ + } + return true, nil +} + +// Following will move to the CockroachDB code. It is here only for +// illustration. + +type crdbDataBlockTimestampCollector struct { + // Keep the encoded timestamps in min, max and decode in FinishDataBlock. + min, max []byte +} + +var _ DataBlockIntervalCollector = &crdbDataBlockTimestampCollector{} + +const engineKeyVersionWallTimeLen = 8 +const engineKeyVersionWallAndLogicalTimeLen = 12 +const engineKeyVersionWallLogicalAndSyntheticTimeLen = 13 + +func (tc *crdbDataBlockTimestampCollector) Add(key InternalKey, value []byte) error { + k := key.UserKey + if len(k) == 0 { + return nil + } + // Last byte is the version length + 1 when there is a version, + // else it is 0. + versionLen := int(k[len(k)-1]) + // keyPartEnd points to the sentinel byte. + keyPartEnd := len(k) - 1 - versionLen + if keyPartEnd < 0 { + return errors.Errorf("invalid key") + } + if versionLen > 0 && (versionLen == engineKeyVersionWallTimeLen || + versionLen == engineKeyVersionWallAndLogicalTimeLen || + versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen) { + // Version consists of the bytes after the sentinel and before the length. + k = k[keyPartEnd+1 : len(k)-1] + if len(tc.min) == 0 || bytes.Compare(k, tc.min) < 0 { + tc.min = append(tc.min[:0], k...) + } + if len(tc.max) == 0 || bytes.Compare(k, tc.max) > 0 { + tc.max = append(tc.max[:0], k...) + } + } + return nil +} + +func decodeWallTime(ts []byte) uint64 { + return binary.BigEndian.Uint64(ts[0:8]) +} + +func (tc *crdbDataBlockTimestampCollector) FinishDataBlock( + ) (lower uint64, upper uint64, err error) { + if len(tc.min) == 0 { + // No calls to Add that contained a timestamped key. + return 0, 0, nil + } + lower = decodeWallTime(tc.min) + tc.min = tc.min[:0] + // The actual value encoded into walltime is an int64, so +1 will not + // overflow. + upper = decodeWallTime(tc.max) + 1 + tc.max = tc.max[:0] + if lower >= upper { + return 0, 0, + errors.Errorf("corrupt timestamps lower %d >= upper %d", lower, upper) + } + return lower, upper, nil +} + diff --git a/sstable/filter.go b/sstable/filter.go index f21303af955..f8c6d0dff61 100644 --- a/sstable/filter.go +++ b/sstable/filter.go @@ -4,7 +4,9 @@ package sstable -import "sync/atomic" +import ( + "sync/atomic" +) // FilterMetrics holds metrics for the filter policy. type FilterMetrics struct { @@ -31,6 +33,13 @@ type BlockHandle struct { Offset, Length uint64 } +// BlockHandleWithProperties is used for data blocks and first/lower level +// index blocks, since they can be annotated using BlockPropertyCollectors. +type BlockHandleWithProperties struct { + BlockHandle + Props []byte +} + type filterWriter interface { addKey(key []byte) finish() ([]byte, error) diff --git a/sstable/options.go b/sstable/options.go index 797331b961f..70d23c346a4 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -211,6 +211,11 @@ type WriterOptions struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // Checksum specifies which checksum to use. Checksum ChecksumType } diff --git a/sstable/reader.go b/sstable/reader.go index fb260cc3f3d..8e2ed9344f4 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -41,7 +41,11 @@ const ( // decodeBlockHandle returns the block handle encoded at the start of src, as // well as the number of bytes it occupies. It returns zero if given invalid -// input. +// input. A block handle for a data block or a first/lower level index block +// should not be decoded using decodeBlockHandle since the caller may validate +// that the number of bytes decoded is equal to the length of src, which will +// be false if the properties are not decoded. In those cases the caller +// should use decodeBlockHandleWithProperties. func decodeBlockHandle(src []byte) (BlockHandle, int) { offset, n := binary.Uvarint(src) length, m := binary.Uvarint(src[n:]) @@ -51,12 +55,33 @@ func decodeBlockHandle(src []byte) (BlockHandle, int) { return BlockHandle{offset, length}, n + m } +// decodeBlockHandleWithProperties returns the block handle and properties +// encoded in src. src needs to be exactly the length that was encoded. This +// method must be used for data block and first/lower level index blocks. The +// properties in the block handle point to the bytes in src. +func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) { + bh, n := decodeBlockHandle(src) + if n == 0 { + return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle") + } + return BlockHandleWithProperties{ + BlockHandle: bh, + Props: src[n:], + }, nil +} + func encodeBlockHandle(dst []byte, b BlockHandle) int { n := binary.PutUvarint(dst, b.Offset) m := binary.PutUvarint(dst[n:], b.Length) return n + m } +func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte { + n := encodeBlockHandle(dst, b.BlockHandle) + dst = append(dst[:n], b.Props...) + return dst +} + // block is a []byte that holds a sequence of key/value pairs plus an index // over those pairs. type block []byte @@ -76,6 +101,9 @@ type singleLevelIterator struct { // Global lower/upper bound for the iterator. lower []byte upper []byte + // The block property filters do not change in the lifetime of the + // singleLevelIterator or twoLevelIterator. + bpfs []BlockPropertyFilter // Per-block lower/upper bound. Nil if the bound does not apply to the block // because we determined the block lies completely within the bound. blockLower []byte @@ -145,7 +173,23 @@ type singleLevelIterator struct { // not need to do anything. // // Similar examples can be constructed for backward iteration. - + // + // This notion of exactly one key before or after the bounds is not quite + // true when block properties are used to ignore blocks. In that case we + // can't stop precisely at the first block that is past the bounds since + // we are using the index entries to enforce the bounds. + // + // e.g. 3 blocks with keys [b, c] [f, g], [i, j, k] with index entries d, + // h, l. And let the lower bound be k. If the block [i, j, k] is ignored + // due to the block interval annotations we do need to move the index to + // block [f, g] since the index entry for the [i, j, k] block is l which + // is not less than the lower bound of k. So we have passed the entries i, + // j. + // + // This is harmless since the block property filters are fixed for the + // lifetime of the iterator so i, j are irrelevant. In addition, the + // current code will not load the [f, g] block, so the seek optimization + // that attempts to use Next/Prev do not apply anyway. boundsCmp int positionedUsingLatestBounds bool @@ -206,7 +250,8 @@ func checkTwoLevelIterator(obj interface{}) { // init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. -func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *singleLevelIterator) init( + r *Reader, lower, upper []byte, bpfs []BlockPropertyFilter) error { if r.err != nil { return r.err } @@ -217,6 +262,7 @@ func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = bpfs i.reader = r i.cmp = r.Compare err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum) @@ -271,37 +317,55 @@ func (i *singleLevelIterator) initBounds() { } } +type loadBlockResult int8 +const ( + loadBlockOK loadBlockResult = iota + // Could be due to error or because no block left to load. + loadBlockFailed + loadBlockIrrelevant +) + // loadBlock loads the block at the current index position and leaves i.data // unpositioned. If unsuccessful, it sets i.err to any error encountered, which // may be nil if we have simply exhausted the entire table. -func (i *singleLevelIterator) loadBlock() bool { +func (i *singleLevelIterator) loadBlock() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the block // fails. i.data.invalidate() if !i.index.Valid() { - return false + return loadBlockFailed } // Load the next block. v := i.index.Value() - var n int - i.dataBH, n = decodeBlockHandle(v) - if n == 0 || n != len(v) { + bhp, err := decodeBlockHandleWithProperties(v) + i.dataBH = bhp.BlockHandle + if err != nil { i.err = errCorruptIndexEntry - return false + return loadBlockFailed + } + if len(i.bpfs) > 0 { + intersects, err := blockPropertiesFilterer(i.bpfs).intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } } block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataRS) if err != nil { i.err = err - return false + return loadBlockFailed } i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() - return false + return loadBlockFailed } i.initBounds() - return true + return loadBlockOK } func (i *singleLevelIterator) initBoundsForAlreadyLoadedBlock() { @@ -456,16 +520,31 @@ func (i *singleLevelIterator) seekGEHelper( } } // Slow-path. - if ikey, _ := i.index.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { // The target key is greater than any key in the sstable. Invalidate the // block iterator so that a subsequent call to Prev() will return the last // key in the table. i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next block if upper bound is already exceeded. Note that + // the next block starts with keys >= ikey.UserKey since even + // though this is the block separator, the same user key can span + // multiple blocks. Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + // Want to skip to the next block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekGE(key); ikey != nil { @@ -520,6 +599,9 @@ func (i *singleLevelIterator) seekPrefixGE( } i.lastBloomFilterMatched = true } + // The i.exhaustedBounds comparison indicates that the upper bound was + // reached. The i.data.isDataInvalidated() indicates that the sstable was + // exhausted. if trySeekUsingNext && (i.exhaustedBounds == +1 || i.data.isDataInvalidated()) { // Already exhausted, so return nil. return nil, nil @@ -568,12 +650,31 @@ func (i *singleLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { dontSeekWithinBlock = true } } else { - if ikey, _ := i.index.SeekGE(key); ikey == nil { - i.index.Last() + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { + ikey, _ = i.index.Last() + if ikey == nil { + return nil, nil + } } - if !i.loadBlock() { + // INVARIANT: ikey != nil. + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving + // to the previous block if lower bound is already exceeded. Note + // that the previous block starts with keys <= ikey.UserKey since + // even though this is the current block's separator, the same + // user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + // Want to skip to the previous block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekLT(key); ikey != nil { @@ -620,20 +721,38 @@ func (i *singleLevelIterator) firstInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.First(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.First(); ikey != nil { - if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + if result == loadBlockOK { + if ikey, val := i.data.First(); ikey != nil { + if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next block if upper bound is + // already exceeded. Note that the next block starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple blocks. Since upper is exclusive we + // use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { i.exhaustedBounds = +1 return nil, nil } - return ikey, val + // Else fall through to skipForward. } + return i.skipForward() } @@ -659,20 +778,36 @@ func (i *singleLevelIterator) lastInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.Last(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.Last(); ikey != nil { - if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + if result == loadBlockOK { + if ikey, val := i.data.Last(); ikey != nil { + if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here since + // don't want to bother moving to the previous block if lower bound is + // already exceeded. Note that the previous block starts with keys <= + // key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { i.exhaustedBounds = -1 return nil, nil } - return ikey, val } + return i.skipBackward() } @@ -726,14 +861,32 @@ func (i *singleLevelIterator) Prev() (*InternalKey, []byte) { func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { for { - if key, _ := i.index.Next(); key == nil { + var key *InternalKey + if key, _ = i.index.Next(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next block if upper + // bound is already exceeded. Note that the next block starts with + // keys >= key.UserKey since even though this is the block + // separator, the same user key can span multiple blocks. Since + // upper is exclusive we use >= below. + if i.upper != nil && i.cmp(key.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } continue } if key, val := i.data.First(); key != nil { @@ -749,14 +902,31 @@ func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { func (i *singleLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if key, _ := i.index.Prev(); key == nil { + var key *InternalKey + if key, _ = i.index.Prev(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous block if lower + // bound is already exceeded. Note that the previous block starts with + // keys <= key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(key.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } continue } key, val := i.data.Last() @@ -915,12 +1085,24 @@ func (i *compactionIterator) skipForward(key *InternalKey, val []byte) (*Interna if key, _ := i.index.Next(); key == nil { break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } - continue + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } } + // result == loadBlockOK if key, val = i.data.First(); key != nil { break } @@ -945,30 +1127,45 @@ var _ base.InternalIterator = (*twoLevelIterator)(nil) // leaves i.index unpositioned. If unsuccessful, it gets i.err to any error // encountered, which may be nil if we have simply exhausted the entire table. // This is used for two level indexes. -func (i *twoLevelIterator) loadIndex() bool { +func (i *twoLevelIterator) loadIndex() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the // index fails. i.data.invalidate() if !i.topLevelIndex.Valid() { i.index.offset = 0 i.index.restarts = 0 - return false + return loadBlockFailed } - h, n := decodeBlockHandle(i.topLevelIndex.Value()) - if n == 0 || n != len(i.topLevelIndex.Value()) { + bhp, err := decodeBlockHandleWithProperties(i.topLevelIndex.Value()) + if err != nil { i.err = base.CorruptionErrorf("pebble/table: corrupt top level index entry") - return false + return loadBlockFailed } - indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* readaheadState */) + if len(i.bpfs) > 0 { + intersects, err := blockPropertiesFilterer(i.bpfs).intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } + } + indexBlock, err := i.reader.readBlock( + bhp.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { i.err = err - return false + return loadBlockFailed } - i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) - return i.err == nil + if i.err = i.index.initHandle( + i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum); i.err == nil { + return loadBlockOK + } + return loadBlockFailed } -func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *twoLevelIterator) init( + r *Reader, lower, upper []byte, bpfs []BlockPropertyFilter) error { if r.err != nil { return r.err } @@ -979,6 +1176,7 @@ func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = bpfs i.reader = r i.cmp = r.Compare err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum) @@ -1001,18 +1199,34 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { i.exhaustedBounds = 0 i.err = nil // clear cached iteration error + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1023,8 +1237,10 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { + return ikey, val + } } return i.skipForward() } @@ -1067,6 +1283,7 @@ func (i *twoLevelIterator) SeekPrefixGE( // Bloom filter matches. i.exhaustedBounds = 0 + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. @@ -1079,15 +1296,30 @@ func (i *twoLevelIterator) SeekPrefixGE( // block, and in that case we don't need to invalidate and reload the // singleLevelIterator state. trySeekUsingNext = false - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1098,20 +1330,16 @@ func (i *twoLevelIterator) SeekPrefixGE( // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.seekPrefixGE( - prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.seekPrefixGE( + prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { + return ikey, val + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } -func (i *twoLevelIterator) SeekPrefixGEWithExhaustionIndicator( - prefix, key []byte, trySeekUsingNext bool, -) (k *base.InternalKey, value []byte, iterExhaustedAndNotBloomFilterFail bool) { - k, value = i.SeekPrefixGE(prefix, key, trySeekUsingNext) - return k, value, false -} - // SeekLT implements internalIterator.SeekLT, as documented in the pebble // package. Note that SeekLT only checks the lower bound. It is up to the // caller to ensure that key is less than the upper bound. @@ -1121,31 +1349,58 @@ func (i *twoLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 + var result loadBlockResult + var ikey *InternalKey // NB: Unlike SeekGE, we don't have a fast-path here since we don't know // whether the topLevelIndex is positioned after the position that would // be returned by doing i.topLevelIndex.SeekGE(). To know this we would // need to know the index key preceding the current one. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result = i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - return i.singleLevelIterator.lastInternal() - } - - if !i.loadIndex() { - return nil, nil - } - - if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraints, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } else { + result = i.loadIndex() + if result == loadBlockFailed { + return nil, nil + } + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraint, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving to + // the previous entry in the top level index if lower bound is already + // exceeded. Note that the previous entry starts with keys <= + // ikey.UserKey since even though this is the current block's + // separator, the same user key can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1162,17 +1417,32 @@ func (i *twoLevelIterator) First() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.First(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.First(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.First(); ikey != nil { + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next entry in the top level + // index if upper bound is already exceeded. Note that the next entry + // starts with keys >= ikey.UserKey since even though this is the + // block separator, the same user key can span multiple index blocks. + // Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } @@ -1189,17 +1459,32 @@ func (i *twoLevelIterator) Last() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.Last(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.Last(); ikey != nil { + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1235,51 +1520,75 @@ func (i *twoLevelIterator) Prev() (*InternalKey, []byte) { func (i *twoLevelIterator) skipForward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the upper bound. - // + if i.err != nil || i.exhaustedBounds > 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Next(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Next(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = +1. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next entry in the top + // level index if upper bound is already exceeded. Note that the + // next entry starts with keys >= ikey.UserKey since even though + // this is the block separator, the same user key can span + // multiple index blocks. Since upper is exclusive we use >= + // below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + // Next iteration will return. + } } } } func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the lower bound. + if i.err != nil || i.exhaustedBounds < 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Prev(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Prev(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = -1. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + // Next iteration will return. + } } } } @@ -1370,10 +1679,26 @@ func (i *twoLevelCompactionIterator) skipForward( if key, _ := i.topLevelIndex.Next(); key == nil { break } - if i.loadIndex() { - if key, val = i.singleLevelIterator.First(); key != nil { + result := i.loadIndex() + if result != loadBlockOK { + if i.err != nil { break } + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } + } + // result == loadBlockOK + if key, val = i.singleLevelIterator.First(); key != nil { + break } } } @@ -1777,15 +2102,27 @@ func (r *Reader) get(key []byte) (value []byte, err error) { return newValue, nil } -// NewIter returns an iterator for the contents of the table. If an error -// occurs, NewIter cleans up after itself and returns a nil iterator. -func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { +// NewIterWithBlockPropertyFilters returns an iterator for the contents of the +// table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after +// itself and returns a nil iterator. +func (r *Reader) NewIterWithBlockPropertyFilters( + lower, upper []byte, bpfs []BlockPropertyFilter) (Iterator, error) { + // Check that sorted and no duplicates. + if len(bpfs) > 1 { + for i := range bpfs { + if i > 0 && bpfs[i-1].ShortID() >= bpfs[i].ShortID() { + return nil, errors.Errorf( + "BlockPropertyFilters %d, %d are not sorted", + bpfs[i-1].ShortID(), bpfs[i].ShortID()) + } + } + } // NB: pebble.tableCache wraps the returned iterator with one which performs // reference counting on the Reader, preventing the Reader from being closed // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, bpfs) if err != nil { return nil, err } @@ -1793,20 +2130,26 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, bpfs) if err != nil { return nil, err } return i, nil } +// NewIter returns an iterator for the contents of the table. If an error +// occurs, NewIter cleans up after itself and returns a nil iterator. +func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { + return r.NewIterWithBlockPropertyFilters(lower, upper, nil) +} + // NewCompactionIter returns an iterator similar to NewIter but it also increments // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up // after itself and returns a nil iterator. func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -1817,7 +2160,7 @@ func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { }, nil } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -2116,33 +2459,34 @@ func (r *Reader) Layout() (*Layout, error) { l.Index = append(l.Index, r.indexBH) iter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } } else { l.TopIndex = r.indexBH topIter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := topIter.First(); key != nil; key, value = topIter.Next() { - indexBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + indexBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Index = append(l.Index, indexBH) + l.Index = append(l.Index, indexBH.BlockHandle) - subIndex, err := r.readBlock(indexBH, nil /* transform */, nil /* readaheadState */) + subIndex, err := r.readBlock( + indexBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return nil, err } iter, _ := newBlockIter(r.Compare, subIndex.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } subIndex.Release() } @@ -2237,11 +2581,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, topIter.Error() } - startIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* readaheadState */) + startIdxBlock, err := r.readBlock( + startIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2257,11 +2602,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } } else { - endIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* readaheadState */) + endIdxBlock, err := r.readBlock( + endIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2280,8 +2626,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, startIdxIter.Error() } - startBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } @@ -2297,8 +2643,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range spans beyond this file. Include data blocks through the last. return r.Properties.DataSize - startBH.Offset, nil } - endBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } return endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset, nil @@ -2532,8 +2878,8 @@ func (l *Layout) Describe( case "index", "top-index": iter, _ := newBlockIter(r.Compare, h.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - bh, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + bh, err := decodeBlockHandleWithProperties(value) + if err != nil { fmt.Fprintf(w, "%10d [err: %s]\n", b.Offset+uint64(iter.offset), err) continue } diff --git a/sstable/table.go b/sstable/table.go index ab6a89080d9..1545b4fd312 100644 --- a/sstable/table.go +++ b/sstable/table.go @@ -123,20 +123,29 @@ is a key that is >= every key in block i and is < every key i block i+1. The successor for the final block is a key that is >= every key in block N-1. The index block restart interval is 1: every entry is a restart point. -A block handle is an offset and a length; the length does not include the 5 -byte trailer. Both numbers are varint-encoded, with no padding between the two -values. The maximum size of an encoded block handle is therefore 20 bytes. + +A block handle is an offset, a length, and optional block properties (for data +blocks and first/lower level index blocks); the length does not include the 5 +byte trailer. All numbers are varint-encoded, with no padding between the two +values. The maximum size of an encoded block handle without properties is 20 +bytes. It is not advised to have properties that accumulate to be longer than +100 bytes. + */ const ( - blockTrailerLen = 5 - blockHandleMaxLen = 10 + 10 + blockTrailerLen = 5 + blockHandleMaxLenWithoutProperties = 10 + 10 + // blockHandleLikelyMaxLen can be used for pre-allocating buffers to + // reduce memory copies. It is not guaranteed that a block handle will not + // exceed this length. + blockHandleLikelyMaxLen = blockHandleMaxLenWithoutProperties + 100 levelDBFooterLen = 48 levelDBMagic = "\x57\xfb\x80\x8b\x24\x75\x47\xdb" levelDBMagicOffset = levelDBFooterLen - len(levelDBMagic) - rocksDBFooterLen = 1 + 2*blockHandleMaxLen + 4 + 8 + rocksDBFooterLen = 1 + 2*blockHandleMaxLenWithoutProperties + 4 + 8 rocksDBMagic = "\xf7\xcf\xf4\x85\xb7\x41\xe2\x88" rocksDBMagicOffset = rocksDBFooterLen - len(rocksDBMagic) rocksDBVersionOffset = rocksDBMagicOffset - 4 diff --git a/sstable/writer.go b/sstable/writer.go index d36401e3468..2a937f37f04 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "math" + "sort" "github.com/cespare/xxhash/v2" "github.com/cockroachdb/errors" @@ -128,12 +129,14 @@ type Writer struct { // Internal flag to allow creation of range-del-v1 format blocks. Only used // for testing. Note that v2 format blocks are backwards compatible with v1 // format blocks. - rangeDelV1Format bool - block blockWriter - indexBlock blockWriter - rangeDelBlock blockWriter - props Properties - propCollectors []TablePropertyCollector + rangeDelV1Format bool + block blockWriter + indexBlock blockWriter + rangeDelBlock blockWriter + props Properties + propCollectors []TablePropertyCollector + blockPropCollectors []BlockPropertyCollector + blockPropsEncoder blockPropertiesEncoder // compressedBuf is the destination buffer for compression. It is // re-used over the lifetime of the writer, avoiding the allocation of a // temporary buffer for each block. @@ -143,13 +146,19 @@ type Writer struct { // nil, or the full keys otherwise. filter filterWriter // tmp is a scratch buffer, large enough to hold either footerLen bytes, - // blockTrailerLen bytes, or (5 * binary.MaxVarintLen64) bytes. - tmp [rocksDBFooterLen]byte + // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most + // likely large enough for a block handle with properties. + tmp [blockHandleLikelyMaxLen]byte xxHasher *xxhash.Digest topLevelIndexBlock blockWriter - indexPartitions []blockWriter + indexPartitions []indexBlockWriterAndBlockProperties +} + +type indexBlockWriterAndBlockProperties struct { + writer blockWriter + properties []byte } // Set sets the value for the given key. The sequence number is set to @@ -241,6 +250,11 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { return err } } + for i := range w.blockPropCollectors { + if err := w.blockPropCollectors[i].Add(key, value); err != nil { + return err + } + } w.maybeAddToFilter(key.UserKey) w.block.add(key, value) @@ -376,16 +390,44 @@ func (w *Writer) maybeFlush(key InternalKey, value []byte) error { w.err = err return w.err } - w.addIndexEntry(key, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(key, bhp); err != nil { + return err + } return nil } +// The BlockHandleWithProperties returned by this method must be encoded +// before any future use of the Writer.blockPropsEncoder, since the properties +// slice will get reused by the blockPropsEncoder. +func (w *Writer) maybeAddBlockPropertiesToBlockHandle( + bh BlockHandle) (BlockHandleWithProperties, error) { + if len(w.blockPropCollectors) == 0 { + return BlockHandleWithProperties{BlockHandle: bh}, nil + } + var err error + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil { + return BlockHandleWithProperties{}, nil + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(w.blockPropCollectors[i].ShortID(), scratch) + } + } + return BlockHandleWithProperties{BlockHandle: bh, Props: w.blockPropsEncoder.unsafeProps()}, nil +} + // addIndexEntry adds an index entry for the specified key and block handle. -func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { - if bh.Length == 0 { +func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) error { + if bhp.Length == 0 { // A valid blockHandle must be non-zero. // In particular, it must have a non-zero length. - return + return nil } prevKey := base.DecodeInternalKey(w.block.curKey) var sep InternalKey @@ -394,16 +436,22 @@ func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { } else { sep = prevKey.Separator(w.compare, w.separator, nil, key) } - n := encodeBlockHandle(w.tmp[:], bh) + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) if supportsTwoLevelIndex(w.tableFormat) && - shouldFlush(sep, w.tmp[:n], &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { + shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { // Enable two level indexes if there is more than one index block. w.twoLevelIndex = true - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return err + } } - w.indexBlock.add(sep, w.tmp[:n]) + for i := range w.blockPropCollectors { + w.blockPropCollectors[i].AddLastDataBlockToIndexBlock() + } + w.indexBlock.add(sep, encoded) + return nil } func shouldFlush( @@ -438,29 +486,51 @@ func shouldFlush( // finishIndexBlock finishes the current index block and adds it to the top // level index block. This is only used when two level indexes are enabled. -func (w *Writer) finishIndexBlock() { - w.indexPartitions = append(w.indexPartitions, w.indexBlock) +func (w *Writer) finishIndexBlock() error { + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + var err error + if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil { + return err + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(w.blockPropCollectors[i].ShortID(), scratch) + } + } + w.indexPartitions = append(w.indexPartitions, + indexBlockWriterAndBlockProperties{ + writer: w.indexBlock, + properties: w.blockPropsEncoder.props(), + }) w.indexBlock = blockWriter{ restartInterval: 1, } + return nil } func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { // Add the final unfinished index. - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return BlockHandle{}, err + } for i := range w.indexPartitions { b := &w.indexPartitions[i] - w.props.NumDataBlocks += uint64(b.nEntries) - sep := base.DecodeInternalKey(b.curKey) - data := b.finish() + w.props.NumDataBlocks += uint64(b.writer.nEntries) + sep := base.DecodeInternalKey(b.writer.curKey) + data := b.writer.finish() w.props.IndexSize += uint64(len(data)) bh, err := w.writeBlock(data, w.compression) if err != nil { return BlockHandle{}, err } - n := encodeBlockHandle(w.tmp[:], bh) - w.topLevelIndexBlock.add(sep, w.tmp[:n]) + bhp := BlockHandleWithProperties{ + BlockHandle: bh, + Props: b.properties, + } + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) + w.topLevelIndexBlock.add(sep, encoded) } // NB: RocksDB includes the block trailer length in the index size @@ -506,7 +576,7 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) } binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) - bh := BlockHandle{w.meta.Size, uint64(len(b))} + bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(b))} if w.cacheID != 0 && w.fileNum != 0 { // Remove the block being written from the cache. This provides defense in @@ -557,7 +627,13 @@ func (w *Writer) Close() (err error) { w.err = err return w.err } - w.addIndexEntry(InternalKey{}, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(InternalKey{}, bhp); err != nil { + return err + } } w.props.DataSize = w.meta.Size @@ -638,6 +714,21 @@ func (w *Writer) Close() (err error) { return err } } + for i := range w.blockPropCollectors { + buf, err := + w.blockPropCollectors[i].FinishTable(w.blockPropsEncoder.getScratchForProp()) + if err != nil { + return err + } + var prop string + if len(buf) > 0 { + prop = string(buf) + } + // NB: The property is populated in the map even if it is the + // empty string, since the presence in the map is what indicates + // that the block property collector was used when writing. + userProps[w.blockPropCollectors[i].Name()] = prop + } if len(userProps) > 0 { w.props.UserProperties = userProps } @@ -823,16 +914,31 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.props.PropertyCollectorNames = "[]" w.props.ExternalFormatVersion = rocksDBExternalFormatVersion - if len(o.TablePropertyCollectors) > 0 { - w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + if len(o.TablePropertyCollectors) > 0 || len(o.BlockPropertyCollectors) > 0 { var buf bytes.Buffer buf.WriteString("[") - for i := range o.TablePropertyCollectors { - w.propCollectors[i] = o.TablePropertyCollectors[i]() - if i > 0 { - buf.WriteString(",") + if len(o.TablePropertyCollectors) > 0 { + w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + for i := range o.TablePropertyCollectors { + w.propCollectors[i] = o.TablePropertyCollectors[i]() + if i > 0 { + buf.WriteString(",") + } + buf.WriteString(w.propCollectors[i].Name()) + } + } + if len(o.BlockPropertyCollectors) > 0 { + w.blockPropCollectors = make([]BlockPropertyCollector, len(o.BlockPropertyCollectors)) + for i := range o.BlockPropertyCollectors { + w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]() + if i > 0 || len(o.TablePropertyCollectors) > 0 { + buf.WriteString(",") + } + buf.WriteString(w.blockPropCollectors[i].Name()) } - buf.WriteString(w.propCollectors[i].Name()) + sort.Slice(w.blockPropCollectors, func(i, j int) bool { + return w.blockPropCollectors[i].ShortID() < w.blockPropCollectors[j].ShortID() + }) } buf.WriteString("]") w.props.PropertyCollectorNames = buf.String() diff --git a/table_cache.go b/table_cache.go index a6427858f1a..7bc862a4927 100644 --- a/table_cache.go +++ b/table_cache.go @@ -318,13 +318,53 @@ func (c *tableCacheShard) newIters( c.unrefValue(v) return emptyIter, nil, nil } + var bpfs []BlockPropertyFilter + if opts != nil { + bpfs = opts.BlockPropertyFilters + } + if len(bpfs) > 0 { + intersects := false + someFiltersAbsent := false + for i := range bpfs { + props, ok := v.reader.Properties.UserProperties[bpfs[i].Name()] + if !ok { + intersects = true + someFiltersAbsent = true + } + filterIntersects, err := bpfs[i].Intersects([]byte(props)) + if err != nil { + return nil, nil, err + } + intersects = intersects || filterIntersects + } + if !intersects { + // Return the empty iterator. This iterator has no mutable state, so + // using a singleton is fine. + c.unrefValue(v) + return emptyIter, nil, nil + } + if someFiltersAbsent { + // The common case is exactly one filter, and if that is absent, + // we don't append to this slice. So we don't bother with trying + // to optimize memory allocations for this slice. + var customizedBpfs []BlockPropertyFilter + for i := range bpfs { + _, ok := v.reader.Properties.UserProperties[bpfs[i].Name()] + if ok { + customizedBpfs = append(customizedBpfs, bpfs[i]) + } + } + bpfs = customizedBpfs + } + } var iter sstable.Iterator var err error if bytesIterated != nil { iter, err = v.reader.NewCompactionIter(bytesIterated) } else { - iter, err = v.reader.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) + iter, err = v.reader.NewIterWithBlockPropertyFilters( + opts.GetLowerBound(), opts.GetUpperBound(), bpfs) } if err != nil { c.unrefValue(v) diff --git a/testdata/checkpoint b/testdata/checkpoint index 66662f14dbc..b75fddc3660 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -28,6 +28,9 @@ sync: db create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -93,9 +96,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.004 -sync: checkpoints/checkpoint1/marker.format-version.000001.004 -close: checkpoints/checkpoint1/marker.format-version.000001.004 +create: checkpoints/checkpoint1/marker.format-version.000001.005 +sync: checkpoints/checkpoint1/marker.format-version.000001.005 +close: checkpoints/checkpoint1/marker.format-version.000001.005 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -150,7 +153,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000003.004 +marker.format-version.000004.005 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -160,7 +163,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.004 +marker.format-version.000001.005 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/event_listener b/testdata/event_listener index 412ba1a19cb..94d0de7f5f2 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -34,6 +34,10 @@ create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db upgraded to format version: 004 +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db +upgraded to format version: 005 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -200,9 +204,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.004 -sync: checkpoint/marker.format-version.000001.004 -close: checkpoint/marker.format-version.000001.004 +create: checkpoint/marker.format-version.000001.005 +sync: checkpoint/marker.format-version.000001.005 +close: checkpoint/marker.format-version.000001.005 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/iterator_block_interval_filter b/testdata/iterator_block_interval_filter new file mode 100644 index 00000000000..763cf02d906 --- /dev/null +++ b/testdata/iterator_block_interval_filter @@ -0,0 +1,70 @@ +# Block size is 1, so each block contains one key, and two level index is used +# since even the lower index blocks have only one key. +build +set a01 a +set b02 b +set c03 c +set d04 d +set e05 e +set f06 f +---- +0.0: + 000005:[a01#1,SET-f06#6,SET] + +iter +first +next +next +next +next +next +next +---- +a01:a +b02:b +c03:c +d04:d +e05:e +f06:f +. + +iter lower=1 upper=2 +first +next +prev +prev +next +next +---- +a01:a +. +a01:a +. +a01:a +. + + +iter lower=3 upper=5 +first +next +next +last +prev +prev +seek-lt f +prev +next +prev +prev +---- +c03:c +d04:d +. +d04:d +c03:c +. +d04:d +c03:c +d04:d +c03:c +.