diff --git a/compaction.go b/compaction.go index c83def49b9..e27bd394a2 100644 --- a/compaction.go +++ b/compaction.go @@ -2052,7 +2052,7 @@ func (d *DB) runCompaction( }() snapshots := d.mu.snapshots.toSlice() - + formatVers := d.mu.formatVers.vers // Release the d.mu lock while doing I/O. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() @@ -2105,6 +2105,10 @@ func (d *DB) runCompaction( } writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level) + if formatVers < FormatBlockPropertyCollector { + // Cannot yet write block properties. + writerOpts.BlockPropertyCollectors = nil + } newOutput := func() error { fileMeta := &fileMetadata{} diff --git a/format_major_version.go b/format_major_version.go index 2d87cf24c5..01ae650cbe 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -66,8 +66,11 @@ const ( // kind, base.InternalKeyKindSetWithDelete. Previous Pebble versions will be // unable to open this database. FormatSetWithDelete + // FormatBlockPropertyCollector is a format major version that introduces + // BlockPropertyCollectors. + FormatBlockPropertyCollector // FormatNewest always contains the most recent format major version. - FormatNewest FormatMajorVersion = FormatSetWithDelete + FormatNewest FormatMajorVersion = FormatBlockPropertyCollector ) // formatMajorVersionMigrations defines the migrations from one format @@ -143,6 +146,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSetWithDelete: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSetWithDelete) }, + FormatBlockPropertyCollector: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 275bd60047..9293bbfd2f 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -33,6 +33,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVersioned, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSetWithDelete)) require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector)) + require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion()) require.NoError(t, d.Close()) // If we Open the database again, leaving the default format, the diff --git a/iterator_test.go b/iterator_test.go index 39be0224d8..fb7c1efc41 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -998,6 +998,171 @@ func TestIteratorSeekOptErrors(t *testing.T) { }) } +type testBlockIntervalCollector struct { + offsetFromEnd int + initialized bool + lower, upper uint64 +} + +func (bi *testBlockIntervalCollector) Add(key InternalKey, value []byte) error { + k := key.UserKey + if len(k) < 2 + bi.offsetFromEnd { + return nil + } + n := len(k)-bi.offsetFromEnd-2 + val, err := strconv.Atoi(string(k[n:n+2])) + if err != nil { + return err + } + if val < 0 { + panic("testBlockIntervalCollector expects values >= 0") + } + uval := uint64(val) + if !bi.initialized { + bi.lower, bi.upper = uval, uval+1 + bi.initialized = true + return nil + } + if bi.lower > uval { + bi.lower = uval + } + if uval >= bi.upper { + bi.upper = uval+1 + } + return nil +} + +func (bi *testBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) { + bi.initialized = false + l, u := bi.lower, bi.upper + bi.lower, bi.upper = 0, 0 + return l, u, nil +} + +func TestIteratorBlockIntervalFilter(t *testing.T) { + var mem vfs.FS + var d *DB + defer func() { + require.NoError(t, d.Close()) + }() + + type collector struct { + id uint16 + offset int + } + createDB := func(collectors []collector) { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + var bpCollectors []func() BlockPropertyCollector + for _, c := range collectors { + coll := c + bpCollectors = append(bpCollectors, func() BlockPropertyCollector { + return sstable.NewBlockIntervalCollector( + fmt.Sprintf("%d", coll.id), + &testBlockIntervalCollector{offsetFromEnd: coll.offset}) + }) + } + opts := &Options{ + FS: mem, + FormatMajorVersion: FormatNewest, + BlockPropertyCollectors: bpCollectors, + } + lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1} + opts.Levels = append(opts.Levels, lo) + + // Automatic compactions may compact away tombstones from L6, making + // some testcases non-deterministic. + opts.private.disableAutomaticCompactions = true + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + + datadriven.RunTest( + t, "testdata/iterator_block_interval_filter", func(td *datadriven.TestData) string { + switch td.Cmd { + case "build": + var collectors []collector + for _, arg := range td.CmdArgs { + switch arg.Key { + case "id_offset": + if len(arg.Vals) != 2 { + return "id and offset not provided" + } + var id, offset int + var err error + if id, err = strconv.Atoi(arg.Vals[0]); err != nil { + return err.Error() + } + if offset, err = strconv.Atoi(arg.Vals[1]); err != nil { + return err.Error() + } + collectors = append(collectors, collector{id: uint16(id), offset: offset}) + default: + return fmt.Sprintf("unknown key: %s", arg.Key) + } + } + createDB(collectors) + b := d.NewBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + if err := d.Flush(); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + + case "iter": + var opts IterOptions + for _, arg := range td.CmdArgs { + switch arg.Key { + case "id_lower_upper": + if len(arg.Vals) != 3 { + return "id, lower, upper not provided" + } + var id, lower, upper int + var err error + if id, err = strconv.Atoi(arg.Vals[0]); err != nil { + return err.Error() + } + if lower, err = strconv.Atoi(arg.Vals[1]); err != nil { + return err.Error() + } + if upper, err = strconv.Atoi(arg.Vals[2]); err != nil { + return err.Error() + } + opts.BlockPropertyFilters = append(opts.BlockPropertyFilters, + sstable.NewBlockIntervalFilter(fmt.Sprintf("%d", id), + uint64(lower), uint64(upper))) + default: + return fmt.Sprintf("unknown key: %s", arg.Key) + } + } + rand.Shuffle(len(opts.BlockPropertyFilters), func(i, j int) { + opts.BlockPropertyFilters[i], opts.BlockPropertyFilters[j] = + opts.BlockPropertyFilters[j], opts.BlockPropertyFilters[i] + }) + iter := d.NewIter(&opts) + return runIterCmd(td, iter, true) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + +// TODO(sumeer): randomized block interval filter test, with a single +// collector, that varies block sizes and checks subset relationship with +// source of truth computed using block size of 1. + func BenchmarkIteratorSeekGE(b *testing.B) { m, keys := buildMemTable(b) iter := &Iterator{ @@ -1233,3 +1398,5 @@ func BenchmarkIteratorSeekGENoop(b *testing.B) { } } } + +// TODO(sumeer): add block interval filtering benchmark diff --git a/level_iter.go b/level_iter.go index 35e8814ab9..04253b2305 100644 --- a/level_iter.go +++ b/level_iter.go @@ -185,6 +185,7 @@ func (l *levelIter) init( l.lower = opts.LowerBound l.upper = opts.UpperBound l.tableOpts.TableFilter = opts.TableFilter + l.tableOpts.BlockPropertyFilters = opts.BlockPropertyFilters l.cmp = cmp l.split = split l.iterFile = nil diff --git a/open_test.go b/open_test.go index 5837cad0f8..3ed53d89c6 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000003.004", + "marker.format-version.000004.005", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/options.go b/options.go index 383d4d510f..d7d7662bc3 100644 --- a/options.go +++ b/options.go @@ -52,6 +52,12 @@ type FilterPolicy = base.FilterPolicy // TablePropertyCollector exports the sstable.TablePropertyCollector type. type TablePropertyCollector = sstable.TablePropertyCollector +// BlockPropertyCollector exports the sstable.BlockPropertyCollector type. +type BlockPropertyCollector = sstable.BlockPropertyCollector + +// BlockPropertyFilter exports the sstable.BlockPropertyFilter type. +type BlockPropertyFilter = sstable.BlockPropertyFilter + // IterOptions hold the optional per-query parameters for NewIter. // // Like Options, a nil *IterOptions is valid and means to use the default @@ -72,7 +78,12 @@ type IterOptions struct { // false to skip scanning. This function must be thread-safe since the same // function can be used by multiple iterators, if the iterator is cloned. TableFilter func(userProps map[string]string) bool - + // BlockPropertyFilters can be used to avoid scanning tables and blocks in + // tables. It is requires that this slice is sorted in increasing order of + // the BlockPropertyFilter.ShortID. This slice represents an intersection + // across all filters, i.e., all filters must indicate that the block is + // relevant. + BlockPropertyFilters []BlockPropertyFilter // Internal options. logger Logger } @@ -497,6 +508,11 @@ type Options struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // WALBytesPerSync sets the number of bytes to write to a WAL before calling // Sync on it in the background. Just like with BytesPerSync above, this // helps smooth out disk write latencies, and avoids cases where the OS @@ -1166,6 +1182,7 @@ func (o *Options) MakeWriterOptions(level int) sstable.WriterOptions { } writerOpts.TableFormat = sstable.TableFormatRocksDBv2 writerOpts.TablePropertyCollectors = o.TablePropertyCollectors + writerOpts.BlockPropertyCollectors = o.BlockPropertyCollectors } levelOpts := o.Level(level) writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval diff --git a/sstable/block_property.go b/sstable/block_property.go new file mode 100644 index 0000000000..3ebb8f7f70 --- /dev/null +++ b/sstable/block_property.go @@ -0,0 +1,488 @@ +// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "encoding/binary" + "fmt" + "math" + "sync" + + "github.com/cockroachdb/pebble/internal/base" +) + +// Block properties are an optional user-facing feature that can be used to +// filter data blocks (and whole sstables) from an Iterator before they are +// loaded. They do not apply to range delete blocks. These are expected to +// very concisely represent a set of some attribute value contained within the +// key or value, such that the set includes all the attribute values in the +// block. This has some similarities with OLAP pruning approaches that +// maintain min-max attribute values for some column (which concisely +// represent a set), that is then used to prune at query time. In Pebble's +// case, data blocks are small, typically 25-50KB, so these properties should +// reduce their precision in order to be concise -- a good rule of thumb is to +// not consume more than 50-100 bytes across all properties maintained for a +// block, i.e., a 500x reduction compared to loading the data block. +// +// A block property must be assigned a unique name, which is encoded and +// stored in the sstable. This name must be unique among all user-properties +// encoded in an sstable. +// +// A property is represented as a []byte. A nil value or empty byte slice are +// considered semantically identical. The caller is free to choose the +// semantics of an empty byte slice e.g. they could use it to represent the +// empty set or the universal set, whichever they think is more common and +// therefore better to encode more concisely. The serialization of the +// property for the various Finish*() calls in a BlockPropertyCollector +// implementation should be identical, since the corresponding +// BlockPropertyFilter implementation is not told the context in which it is +// deserializing the property. +// +// Block properties are more general than table properties and should be +// preferred over using table properties. A BlockPropertyCollector can achieve +// identical behavior to table properties by returning the nil slice from +// FinishDataBlock and FinishIndexBlock, and interpret them as the universal +// set in BlockPropertyFilter, and return a non-universal set in FinishTable. + +// BlockPropertyCollector is used when writing a sstable. +// - All calls to Add are included in the next FinishDataBlock, after which +// the next data block is expected to start. +// +// - The index entry generated for the data block, which contains the return +// value from FinishDataBlock, is not immediately included in the current +// index block. It is included when AddPrevDataBlockToIndexBlock is called. +// An alternative would be to return an opaque handle from FinishDataBlock +// and pass it to a new AddToIndexBlock method, which requires more +// plumbing, and passing of an interface{} results in a undesirable heap +// allocation. AddPrevDataBlockToIndexBlock must be called before keys are +// added to the new data block. +type BlockPropertyCollector interface { + // Name returns the name of the block property collector. + Name() string + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the property value for the finished block. + FinishDataBlock(buf []byte) ([]byte, error) + // AddPrevDataBlockToIndexBlock adds the entry corresponding to the + // previous FinishDataBlock to the current index block. + AddPrevDataBlockToIndexBlock() + // FinishIndexBlock is called when an index block, containing all the + // key-value pairs since the last FinishIndexBlock, will no longer see new + // entries. It returns the property value for the index block. + FinishIndexBlock(buf []byte) ([]byte, error) + // FinishTable is called when the sstable is finished, and returns the + // property value for the sstable. + FinishTable(buf []byte) ([]byte, error) +} + +// BlockPropertyFilter is used in an Iterator to filter sstables and blocks +// within the sstable. It should not maintain any per-sstable state, and must +// be thread-safe. +type BlockPropertyFilter interface { + // Name returns the name of the block property collector. + Name() string + // Intersects returns true if the set represented by prop intersects with + // the set in the filter. + Intersects(prop []byte) (bool, error) +} + +// BlockIntervalCollector is a helper implementation of BlockPropertyCollector +// for users who want to represent a set of the form [lower,upper) where both +// lower and upper are uint64, and lower <= upper. +// +// The set is encoded as: +// - Two varint integers, (lower,upper-lower), when upper-lower > 0 +// - Nil, when upper-lower=0 +// +// Users must not expect this to preserve differences between empty sets -- +// they will all get turned into the semantically equivalent [0,0). +type BlockIntervalCollector struct { + name string + dbic DataBlockIntervalCollector + + blockInterval interval + indexInterval interval + tableInterval interval +} + +var _ BlockPropertyCollector = &BlockIntervalCollector{} + +// DataBlockIntervalCollector is the interface used by BlockIntervalCollector +// that contains the actual logic pertaining to the property. It only +// maintains state for the current data block, and resets that state in +// FinishDataBlock. This interface can be used to reduce parsing costs. +type DataBlockIntervalCollector interface { + // Add is called with each new entry added to a data block in the sstable. + // The callee can assume that these are in sorted order. + Add(key InternalKey, value []byte) error + // FinishDataBlock is called when all the entries have been added to a + // data block. Subsequent Add calls will be for the next data block. It + // returns the [lower, upper) for the finished block. + FinishDataBlock() (lower uint64, upper uint64, err error) +} + +// NewBlockIntervalCollector constructs a BlockIntervalCollector, with the +// given name and data block collector. +func NewBlockIntervalCollector( + name string, blockAttributeCollector DataBlockIntervalCollector) *BlockIntervalCollector { + return &BlockIntervalCollector{ + name: name, dbic: blockAttributeCollector} +} + +// Name implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Name() string { + return b.name +} + +// Add implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Add(key InternalKey, value []byte) error { + return b.dbic.Add(key, value) +} + +// FinishDataBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishDataBlock(buf []byte) ([]byte, error) { + var err error + b.blockInterval.lower, b.blockInterval.upper, err = b.dbic.FinishDataBlock() + if err != nil { + return buf, err + } + buf = b.blockInterval.encode(buf) + b.tableInterval.union(b.blockInterval) + return buf, nil +} + +// AddPrevDataBlockToIndexBlock implements the BlockPropertyCollector +// interface. +func (b *BlockIntervalCollector) AddPrevDataBlockToIndexBlock() { + b.indexInterval.union(b.blockInterval) + b.blockInterval = interval{} +} + +// FinishIndexBlock implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishIndexBlock(buf []byte) ([]byte, error) { + buf = b.indexInterval.encode(buf) + b.indexInterval = interval{} + return buf, nil +} + +// FinishTable implements the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) FinishTable(buf []byte) ([]byte, error) { + return b.tableInterval.encode(buf), nil +} + +type interval struct { + lower uint64 + upper uint64 +} + +func (i interval) encode(buf []byte) []byte { + if i.lower < i.upper { + var encoded [binary.MaxVarintLen64*2]byte + n := binary.PutUvarint(encoded[:], i.lower) + n += binary.PutUvarint(encoded[n:], i.upper-i.lower) + buf = append(buf, encoded[:n]...) + } + return buf +} + +func (i *interval) decode(buf []byte) error { + if len(buf) == 0 { + *i = interval{} + return nil + } + var n int + i.lower, n = binary.Uvarint(buf) + if n <= 0 || n >= len(buf) { + return base.CorruptionErrorf("cannot decode interval from buf %x", buf) + } + pos := n + i.upper, n = binary.Uvarint(buf[pos:]) + pos += n + if pos != len(buf) || n <= 0 { + return base.CorruptionErrorf("cannot decode interval from buf %x", buf) + } + // Delta decode. + i.upper += i.lower + if i.upper < i.lower { + return base.CorruptionErrorf("unexpected overflow, upper %d < lower %d", i.upper, i.lower) + } + return nil +} + +func (i *interval) union(x interval) { + if x.lower >= x.upper { + // x is the empty set. + return + } + if i.lower >= i.upper { + // i is the empty set. + *i = x + return + } + // Both sets are non-empty. + if x.lower < i.lower { + i.lower = x.lower + } + if x.upper > i.upper { + i.upper = x.upper + } +} + +func (i interval) intersects(x interval) bool { + if i.lower >= i.upper || x.lower >= x.upper { + // At least one of the sets is empty. + return false + } + // Neither set is empty. + return i.upper > x.lower && i.lower < x.upper +} + +// BlockIntervalFilter is an implementation of BlockPropertyFilter when the +// corresponding collector is a BlockIntervalCollector. That is, the set is of +// the form [lower, upper). +type BlockIntervalFilter struct { + name string + filterInterval interval +} + +// NewBlockIntervalFilter constructs a BlockIntervalFilter with the given name +// and [lower, upper) bounds. +func NewBlockIntervalFilter( + name string, lower uint64, upper uint64) *BlockIntervalFilter { + return &BlockIntervalFilter{ + name: name, + filterInterval: interval{lower: lower, upper: upper}, + } +} + +// Name implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Name() string { + return b.name +} + +// Intersects implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) Intersects(prop []byte) (bool, error) { + var i interval + if err := i.decode(prop); err != nil { + return false, err + } + return i.intersects(b.filterInterval), nil +} + +// When encoding block properties for each block, we cannot afford to encode +// the name. Instead, the name is mapped to a shortID, in the scope of that +// sstable, and the shortID is encoded. Since we use a uint8, there is a limit +// of 256 block property collectors per sstable. +type shortID uint8 + +type blockPropertiesEncoder struct { + propsBuf []byte + scratch []byte +} + +func (e *blockPropertiesEncoder) getScratchForProp() []byte { + return e.scratch[:0] +} + +func (e *blockPropertiesEncoder) resetProps() { + e.propsBuf = e.propsBuf[:0] +} + +func (e *blockPropertiesEncoder) addProp(id shortID, scratch []byte) { + const lenID = 1 + lenProp := uvarintLen(uint32(len(scratch))) + n := lenID + lenProp + len(scratch) + if cap(e.propsBuf) - len(e.propsBuf) < n { + size := len(e.propsBuf) + 2*n + if size < 2*cap(e.propsBuf) { + size = 2*cap(e.propsBuf) + } + buf := make([]byte, len(e.propsBuf), size) + copy(buf, e.propsBuf) + e.propsBuf = buf + } + pos := len(e.propsBuf) + b := e.propsBuf[pos:pos+lenID] + b[0] = byte(id) + pos += lenID + b = e.propsBuf[pos:pos+lenProp] + n = binary.PutUvarint(b, uint64(len(scratch))) + pos += n + b = e.propsBuf[pos:pos+len(scratch)] + pos += len(scratch) + copy(b, scratch) + e.propsBuf = e.propsBuf[0:pos] + e.scratch = scratch +} + +func (e *blockPropertiesEncoder) unsafeProps() []byte { + return e.propsBuf +} + +func (e *blockPropertiesEncoder) props() []byte { + buf := make([]byte, len(e.propsBuf)) + copy(buf, e.propsBuf) + return buf +} + +type blockPropertiesDecoder struct { + props []byte +} + +func (d *blockPropertiesDecoder) done() bool { + return len(d.props) == 0 +} + +// REQUIRES: !done() +func (d *blockPropertiesDecoder) next() (id shortID, prop []byte, err error) { + const lenID = 1 + id = shortID(d.props[0]) + propLen, m := binary.Uvarint(d.props[lenID:]) + n := lenID + m + if m <= 0 || propLen == 0 || (n + int(propLen)) > len(d.props) { + return 0, nil, base.CorruptionErrorf("corrupt block property length") + } + prop = d.props[n:n+int(propLen)] + d.props = d.props[n+int(propLen):] + return id, prop, nil +} + +// BlockPropertiesFilterer provides filtering support when reading an sstable +// in the context of an iterator that has a slice of BlockPropertyFilters. +// After the call to NewBlockPropertiesFilterer, the caller must call +// IntersectsUserPropsAndFinishInit to check if the sstable intersects with +// the filters. If it does intersect, this function also finishes initializing +// the BlockPropertiesFilterer using the shortIDs for the relevant filters. +// Subsequent checks for relevance of a block should use the intersects +// method. +type BlockPropertiesFilterer struct { + filters []BlockPropertyFilter + // Maps shortID => index in filters. This can be sparse, and shortIDs for + // which there is no filter are represented with an index of -1. The + // length of this can be shorter than the shortIDs allocated in the + // sstable. e.g. if the sstable used shortIDs 0, 1, 2, 3, and the iterator + // has two filters, corresponding to shortIDs 2, 0, this would be: + // len(shortIDToFiltersIndex)==3, 0=>1, 1=>-1, 2=>0. + shortIDToFiltersIndex []int +} + +var blockPropertiesFiltererPool = sync.Pool{ + New: func() interface{} { + return &BlockPropertiesFilterer{} + }, +} + +// NewBlockPropertiesFilterer returns a partially initialized filterer. To complete +// initialization, call IntersectsUserPropsAndFinishInit. +func NewBlockPropertiesFilterer(filters []BlockPropertyFilter) *BlockPropertiesFilterer { + filterer := blockPropertiesFiltererPool.Get().(*BlockPropertiesFilterer) + *filterer = BlockPropertiesFilterer{filters: filters} + return filterer +} + +func releaseBlockPropertiesFilterer(filterer *BlockPropertiesFilterer) { + *filterer = BlockPropertiesFilterer{ + shortIDToFiltersIndex: filterer.shortIDToFiltersIndex[:0], + } + blockPropertiesFiltererPool.Put(filterer) +} + +// IntersectsUserPropsAndFinishInit is called with the user properties map for +// the sstable and returns whether the sstable intersects the filters. It +// additionally initializes the shortIDToFiltersIndex for the filters that are +// relevant to this sstable. +func (f *BlockPropertiesFilterer) IntersectsUserPropsAndFinishInit( + userProperties map[string]string) (bool, error) { + for i := range f.filters { + props, ok := userProperties[f.filters[i].Name()] + if !ok { + // Collector was not used when writing this file, so it is + // considered intersecting. + continue + } + byteProps := []byte(props) + if len(byteProps) < 1 { + return false, base.CorruptionErrorf( + "block properties for %s is corrupted", f.filters[i].Name()) + } + shortID := shortID(byteProps[0]) + intersects, err := f.filters[i].Intersects(byteProps[1:]) + if err != nil || !intersects { + return false, err + } + // Intersects the sstable, so need to use this filter when + // deciding whether to read blocks. + n := len(f.shortIDToFiltersIndex) + if n <= int(shortID) { + if cap(f.shortIDToFiltersIndex) <= int(shortID) { + index := make([]int, shortID+1, 2*(shortID+1)) + copy(index, f.shortIDToFiltersIndex) + f.shortIDToFiltersIndex = index + } else { + f.shortIDToFiltersIndex = f.shortIDToFiltersIndex[:shortID+1] + } + for j := n; j < int(shortID); j++ { + f.shortIDToFiltersIndex[j] = -1 + } + } + f.shortIDToFiltersIndex[shortID] = i + } + return true, nil +} + +func (f *BlockPropertiesFilterer) intersects(props []byte) (bool, error) { + i := 0 + decoder := blockPropertiesDecoder{props: props} + for i < len(f.shortIDToFiltersIndex) { + var id int + var prop []byte + if !decoder.done() { + var shortID shortID + var err error + shortID, prop, err = decoder.next() + if err != nil { + return false, err + } + id = int(shortID) + } else { + id = math.MaxUint8+1 + } + for i < len(f.shortIDToFiltersIndex) && id > i { + if f.shortIDToFiltersIndex[i] >= 0 { + // There is a filter for this id, but the property for this id + // is not encoded for this block. + intersects, err := f.filters[f.shortIDToFiltersIndex[i]].Intersects(nil) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + } + i++ + } + if i >= len(f.shortIDToFiltersIndex) { + return true, nil + } + // INVARIANT: id <= i. And since i is always incremented by 1, id==i. + if id != i { + panic(fmt.Sprintf("%d != %d", id, i)) + } + if f.shortIDToFiltersIndex[i] >= 0 { + intersects, err := f.filters[f.shortIDToFiltersIndex[i]].Intersects(prop) + if err != nil { + return false, err + } + if !intersects { + return false, nil + } + } + i++ + } + return true, nil +} diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go new file mode 100644 index 0000000000..bc5112459e --- /dev/null +++ b/sstable/block_property_test.go @@ -0,0 +1,742 @@ +// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "bytes" + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIntervalEncodeDecode(t *testing.T) { + testCases := []struct { + name string + lower uint64 + upper uint64 + len int + }{ + { + name: "empty zero", + lower: 0, + upper: 0, + len: 0, + }, + { + name: "empty non-zero", + lower: 5, + upper: 5, + len: 0, + }, + { + name: "empty lower > upper", + lower: math.MaxUint64, + upper: math.MaxUint64-1, + len: 0, + }, + { + name: "small", + lower: 50, + upper: 61, + len: 2, + }, + { + name: "big", + lower: 0, + upper: math.MaxUint64, + len: 11, + }, + } + for _, tc := range testCases { + buf := make([]byte, 100) + t.Run(tc.name, func(t *testing.T) { + i1 := interval{lower: tc.lower, upper: tc.upper} + b1 := i1.encode(nil) + b2 := i1.encode(buf[:0]) + require.True(t, bytes.Equal(b1, b2), "%x != %x", b1, b2) + expectedInterval := i1 + if expectedInterval.lower >= expectedInterval.upper { + expectedInterval = interval{} + } + // Arbitrary initial value. + arbitraryInterval := interval{lower: 1000, upper:1000} + i2 := arbitraryInterval + i2.decode(b1) + require.Equal(t, expectedInterval, i2) + i2 = arbitraryInterval + i2.decode(b2) + require.Equal(t, expectedInterval, i2) + require.Equal(t, tc.len, len(b1)) + }) + } +} + +func TestIntervalUnionIntersects(t *testing.T) { + testCases := []struct { + name string + i1 interval + i2 interval + union interval + intersects bool + }{ + { + name: "empty and empty", + i1: interval{}, + i2: interval{}, + union: interval{}, + intersects: false, + }, + { + name: "empty and empty non-zero", + i1: interval{}, + i2: interval{100, 99}, + union: interval{}, + intersects: false, + }, + { + name: "empty and non-empty", + i1: interval{}, + i2: interval{80, 100}, + union: interval{80, 100}, + intersects: false, + }, + { + name: "disjoint sets", + i1: interval{50, 60}, + i2: interval{math.MaxUint64-5, math.MaxUint64}, + union: interval{50, math.MaxUint64}, + intersects: false, + }, + { + name: "adjacent sets", + i1: interval{50, 60}, + i2: interval{60, 100}, + union: interval{50, 100}, + intersects: false, + }, + { + name: "overlapping sets", + i1: interval{50, 60}, + i2: interval{59, 120}, + union: interval{50, 120}, + intersects: true, + }, + } + isEmpty := func(i interval) bool { + return i.lower >= i.upper + } + // adjustUnionExpectation exists because union does not try to + // canonicalize empty sets by turning them into [0, 0), since it is + // unnecessary -- the higher level context of the BlockIntervalCollector + // will do so when calling interval.encode. + adjustUnionExpectation := func(expected interval, i1 interval, i2 interval) interval { + if isEmpty(i2) { + return i1 + } + if isEmpty(i1) { + return i2 + } + return expected + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.intersects, tc.i1.intersects(tc.i2)) + require.Equal(t, tc.intersects, tc.i2.intersects(tc.i1)) + require.Equal(t, !isEmpty(tc.i1), tc.i1.intersects(tc.i1)) + require.Equal(t, !isEmpty(tc.i2), tc.i2.intersects(tc.i2)) + union := tc.i1 + union.union(tc.i2) + require.Equal(t, adjustUnionExpectation(tc.union, tc.i1, tc.i2), union) + union = tc.i2 + union.union(tc.i1) + require.Equal(t, adjustUnionExpectation(tc.union, tc.i2, tc.i1), union) + }) + } +} + +type testDataBlockIntervalCollector struct { + i interval +} + +func (c *testDataBlockIntervalCollector) Add(key InternalKey, value []byte) error { + return nil +} + +func (c *testDataBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) { + return c.i.lower, c.i.upper, nil +} + +func TestBlockIntervalCollector(t *testing.T) { + var dbic testDataBlockIntervalCollector + bic := NewBlockIntervalCollector("foo", &dbic) + require.Equal(t, "foo", bic.Name()) + // Single key to call Add once. The real action is in the other methods. + key := InternalKey{UserKey: []byte("a")} + require.NoError(t, bic.Add(key, nil)) + dbic.i = interval{1, 1} + // First data block has empty interval. + encoded, err := bic.FinishDataBlock(nil) + require.NoError(t, err) + require.True(t, bytes.Equal(nil, encoded)) + bic.AddPrevDataBlockToIndexBlock() + // Second data block. + dbic.i = interval{20, 25} + encoded, err = bic.FinishDataBlock(nil) + require.NoError(t, err) + var decoded interval + require.NoError(t, decoded.decode(encoded)) + require.Equal(t, interval{20, 25}, decoded) + var encodedIndexBlock []byte + // Finish index block before including second data block. + encodedIndexBlock, err = bic.FinishIndexBlock(nil) + require.NoError(t, err) + require.True(t, bytes.Equal(nil, encodedIndexBlock)) + bic.AddPrevDataBlockToIndexBlock() + // Third data block. + dbic.i = interval{10, 15} + encoded, err = bic.FinishDataBlock(nil) + require.NoError(t, err) + require.NoError(t, decoded.decode(encoded)) + require.Equal(t, interval{10, 15}, decoded) + bic.AddPrevDataBlockToIndexBlock() + // Fourth data block. + dbic.i = interval{100, 105} + encoded, err = bic.FinishDataBlock(nil) + require.NoError(t, err) + require.NoError(t, decoded.decode(encoded)) + require.Equal(t, interval{100, 105}, decoded) + // Finish index block before including fourth data block. + encodedIndexBlock, err = bic.FinishIndexBlock(nil) + require.NoError(t, err) + require.NoError(t, decoded.decode(encodedIndexBlock)) + require.Equal(t, interval{10, 25}, decoded) + bic.AddPrevDataBlockToIndexBlock() + // Finish index block that contains only fourth data block. + encodedIndexBlock, err = bic.FinishIndexBlock(nil) + require.NoError(t, err) + require.NoError(t, decoded.decode(encodedIndexBlock)) + require.Equal(t, interval{100, 105}, decoded) + var encodedTable []byte + // Finish table. + encodedTable, err = bic.FinishTable(nil) + require.NoError(t, err) + require.NoError(t, decoded.decode(encodedTable)) + require.Equal(t, interval{10, 105}, decoded) +} + +func TestBlockIntervalFilter(t *testing.T) { + testCases := []struct { + name string + filter interval + prop interval + intersects bool + }{ + { + name: "non-empty and empty", + filter: interval{10, 15}, + prop: interval{}, + intersects: false, + }, + { + name: "does not intersect", + filter: interval{10, 15}, + prop: interval{15, 20}, + intersects: false, + }, + { + name: "intersects", + filter: interval{10, 15}, + prop: interval{14, 20}, + intersects: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var dbic testDataBlockIntervalCollector + name := "foo" + bic := NewBlockIntervalCollector(name, &dbic) + bif := NewBlockIntervalFilter(name, tc.filter.lower, tc.filter.upper) + dbic.i = tc.prop + prop, _ := bic.FinishDataBlock(nil) + intersects, err := bif.Intersects(prop) + require.NoError(t, err) + require.Equal(t, tc.intersects, intersects) + }) + } +} + +func TestBlockPropertiesEncoderDecoder(t *testing.T) { + var encoder blockPropertiesEncoder + scratch := encoder.getScratchForProp() + scratch = append(scratch, []byte("foo")...) + encoder.addProp(1, scratch) + scratch = encoder.getScratchForProp() + require.LessOrEqual(t, 3, cap(scratch)) + scratch = append(scratch, []byte("cockroach")...) + encoder.addProp(10, scratch) + props1 := encoder.props() + unsafeProps := encoder.unsafeProps() + require.True(t, bytes.Equal(props1, unsafeProps), "%x != %x", props1, unsafeProps) + decodeProps1 := func() { + decoder := blockPropertiesDecoder{props: props1} + require.False(t, decoder.done()) + id, prop, err := decoder.next() + require.NoError(t, err) + require.Equal(t, shortID(1), id) + require.Equal(t, string(prop), "foo") + require.False(t, decoder.done()) + id, prop, err = decoder.next() + require.NoError(t, err) + require.Equal(t, shortID(10), id) + require.Equal(t, string(prop), "cockroach") + require.True(t, decoder.done()) + } + decodeProps1() + + encoder.resetProps() + scratch = encoder.getScratchForProp() + require.LessOrEqual(t, 9, cap(scratch)) + scratch = append(scratch, []byte("bar")...) + encoder.addProp(10, scratch) + props2 := encoder.props() + unsafeProps = encoder.unsafeProps() + require.True(t, bytes.Equal(props2, unsafeProps), "%x != %x", props2, unsafeProps) + // Safe props should still decode. + decodeProps1() + // Decode props2 + decoder := blockPropertiesDecoder{props: props2} + require.False(t, decoder.done()) + id, prop, err := decoder.next() + require.NoError(t, err) + require.Equal(t, shortID(10), id) + require.Equal(t, string(prop), "bar") + require.True(t, decoder.done()) +} + +// filterWithTrueForEmptyProp is a wrapper for BlockIntervalFilter that +// delegates to it except when the property is empty, in which case it returns +// true. +type filterWithTrueForEmptyProp struct { + *BlockIntervalFilter +} +func (b filterWithTrueForEmptyProp) Intersects(prop []byte) (bool, error) { + if len(prop) == 0 { + return true, nil + } + return b.BlockIntervalFilter.Intersects(prop) +} + +func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) { + // props with id=0, interval [10, 20); id=10, interval [110, 120). + var dbic testDataBlockIntervalCollector + bic0 := NewBlockIntervalCollector("p0", &dbic) + bic0Id := byte(0) + bic10 := NewBlockIntervalCollector("p10", &dbic) + bic10Id := byte(10) + dbic.i = interval{10, 20} + prop0 := append([]byte(nil), bic0Id) + _, err := bic0.FinishDataBlock(nil) + require.NoError(t, err) + prop0, err = bic0.FinishTable(prop0) + require.NoError(t, err) + dbic.i = interval{110, 120} + prop10 := append([]byte(nil), bic10Id) + _, err = bic10.FinishDataBlock(nil) + require.NoError(t, err) + prop10, err = bic10.FinishTable(prop10) + require.NoError(t, err) + prop0Str := string(prop0) + prop10Str := string(prop10) + type filter struct { + name string + i interval + } + testCases := []struct{ + name string + userProps map[string]string + filters []filter + + // Expected results + intersects bool + shortIDToFiltersIndex []int + }{ + { + name: "no filter, no props", + userProps: map[string]string{}, + filters: nil, + intersects: true, + }, + { + name: "no props", + userProps: map[string]string{}, + filters: []filter{ + {name: "p0", i: interval{20, 30}}, + {name: "p10", i: interval{20, 30}}, + }, + intersects: true, + }, + { + name: "prop0, does not intersect", + userProps: map[string]string{"p0": prop0Str}, + filters: []filter{ + {name: "p0", i: interval{20, 30}}, + {name: "p10", i: interval{20, 30}}, + }, + intersects: false, + }, + { + name: "prop0, intersects", + userProps: map[string]string{"p0": prop0Str}, + filters: []filter{ + {name: "p0", i: interval{11, 21}}, + {name: "p10", i: interval{20, 30}}, + }, + intersects: true, + shortIDToFiltersIndex: []int{0}, + }, + { + name: "prop10, does not intersect", + userProps: map[string]string{"p10": prop10Str}, + filters: []filter{ + {name: "p0", i: interval{11, 21}}, + {name: "p10", i: interval{20, 30}}, + }, + intersects: false, + }, + { + name: "prop10, intersects", + userProps: map[string]string{"p10": prop10Str}, + filters: []filter{ + {name: "p0", i: interval{11, 21}}, + {name: "p10", i: interval{115, 125}}, + }, + intersects: true, + shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1}, + }, + { + name: "prop10, intersects", + userProps: map[string]string{"p10": prop10Str}, + filters: []filter{ + {name: "p10", i: interval{115, 125}}, + {name: "p0", i: interval{11, 21}}, + }, + intersects: true, + shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0}, + }, + { + name: "prop0 and prop10, does not intersect", + userProps: map[string]string{"p0": prop0Str, "p10": prop10Str}, + filters: []filter{ + {name: "p10", i: interval{115, 125}}, + {name: "p0", i: interval{20, 30}}, + }, + intersects: false, + shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0}, + }, + { + name: "prop0 and prop10, does not intersect", + userProps: map[string]string{"p0": prop0Str, "p10": prop10Str}, + filters: []filter{ + {name: "p0", i: interval{10, 20}}, + {name: "p10", i: interval{125, 135}}, + }, + intersects: false, + shortIDToFiltersIndex: []int{0}, + }, + { + name: "prop0 and prop10, intersects", + userProps: map[string]string{"p0": prop0Str, "p10": prop10Str}, + filters: []filter{ + {name: "p10", i: interval{115, 125}}, + {name: "p0", i: interval{10, 20}}, + }, + intersects: true, + shortIDToFiltersIndex: []int{1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var filters []BlockPropertyFilter + for _, f := range tc.filters { + filter := NewBlockIntervalFilter(f.name, f.i.lower, f.i.upper) + filters = append(filters, filter) + } + filterer := NewBlockPropertiesFilterer(filters) + intersects, err := filterer.IntersectsUserPropsAndFinishInit(tc.userProps) + require.NoError(t, err) + require.Equal(t, tc.intersects, intersects) + require.Equal(t, tc.shortIDToFiltersIndex, filterer.shortIDToFiltersIndex) + }) + } +} + +func TestBlockPropertiesFilterer_Intersects(t *testing.T) { + // Setup two different properties values to filter against. + var emptyProps []byte + // props with id=0, interval [10, 20); id=10, interval [110, 120). + var encoder blockPropertiesEncoder + var dbic testDataBlockIntervalCollector + bic0 := NewBlockIntervalCollector("", &dbic) + bic0Id := shortID(0) + bic10 := NewBlockIntervalCollector("", &dbic) + bic10Id := shortID(10) + dbic.i = interval{10, 20} + prop, err := bic0.FinishDataBlock(encoder.getScratchForProp()) + require.NoError(t, err) + encoder.addProp(bic0Id, prop) + dbic.i = interval{110, 120} + prop, err = bic10.FinishDataBlock(encoder.getScratchForProp()) + require.NoError(t, err) + encoder.addProp(bic10Id, prop) + props0And10 := encoder.props() + type filter struct { + shortID shortID + i interval + intersectsForEmptyProp bool + } + testCases := []struct{ + name string + props []byte + // filters must be in ascending order of shortID. + filters []filter + intersects bool + }{ + { + name: "no filter, empty props", + props: emptyProps, + intersects: true, + }, + { + name: "no filter", + props: props0And10, + intersects: true, + }, + { + name: "filter 0, empty props, does not intersect", + props: emptyProps, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + }, + intersects: false, + }, + { + name: "filter 10, empty props, does not intersect", + props: emptyProps, + filters: []filter{ + { + shortID: 0, + i: interval{105, 111}, + }, + }, + intersects: false, + }, + { + name: "filter 0, intersects", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + }, + intersects: true, + }, + { + name: "filter 0, does not intersect", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{20, 25}, + }, + }, + intersects: false, + }, + { + name: "filter 10, intersects", + props: props0And10, + filters: []filter{ + { + shortID: 10, + i: interval{105, 111}, + }, + }, + intersects: true, + }, + { + name: "filter 10, does not intersect", + props: props0And10, + filters: []filter{ + { + shortID: 10, + i: interval{105, 110}, + }, + }, + intersects: false, + }, + { + name: "filter 5, does not intersect since no property", + props: props0And10, + filters: []filter{ + { + shortID: 5, + i: interval{105, 110}, + }, + }, + intersects: false, + }, + { + name: "filter 0 and 5, intersects and not intersects means overall not intersects", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + { + shortID: 5, + i: interval{105, 110}, + }, + }, + intersects: false, + }, + { + name: "filter 0, 5, 7, 11, all intersect", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + { + shortID: 5, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 7, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 11, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + }, + intersects: true, + }, + { + name: "filter 0, 5, 7, 10, 11, all intersect", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + { + shortID: 5, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 7, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 10, + i: interval{105, 111}, + }, + { + shortID: 11, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + }, + intersects: true, + }, + { + name: "filter 0, 5, 7, 10, 11, all intersect except for 10", + props: props0And10, + filters: []filter{ + { + shortID: 0, + i: interval{5, 15}, + }, + { + shortID: 5, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 7, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + { + shortID: 10, + i: interval{105, 110}, + }, + { + shortID: 11, + i: interval{105, 110}, + intersectsForEmptyProp: true, + }, + }, + intersects: false, + }, + + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var filters []BlockPropertyFilter + var shortIDToFiltersIndex []int + if len(tc.filters) > 0 { + shortIDToFiltersIndex = make([]int, tc.filters[len(tc.filters)-1].shortID+1) + for i := range shortIDToFiltersIndex { + shortIDToFiltersIndex[i] = -1 + } + } + for _, f := range tc.filters { + filter := NewBlockIntervalFilter("", f.i.lower, f.i.upper) + bpf := BlockPropertyFilter(filter) + if f.intersectsForEmptyProp { + bpf = filterWithTrueForEmptyProp{filter} + } + shortIDToFiltersIndex[f.shortID] = len(filters) + filters = append(filters, bpf) + } + doFiltering := func() { + bpFilterer := BlockPropertiesFilterer{ + filters: filters, + shortIDToFiltersIndex: shortIDToFiltersIndex, + } + intersects, err := bpFilterer.intersects(tc.props) + require.NoError(t, err) + require.Equal(t, tc.intersects, intersects) + } + doFiltering() + if len(filters) > 1 { + // Permute the filters so that the use of + // shortIDToFiltersIndex is better tested. + permutation := rand.Perm(len(filters)) + filterPerm := make([]BlockPropertyFilter, len(filters)) + for i := range permutation { + filterPerm[i] = filters[permutation[i]] + shortIDToFiltersIndex[tc.filters[permutation[i]].shortID] = i + } + filters = filterPerm + doFiltering() + } + }) + } +} diff --git a/sstable/filter.go b/sstable/filter.go index f21303af95..f8c6d0dff6 100644 --- a/sstable/filter.go +++ b/sstable/filter.go @@ -4,7 +4,9 @@ package sstable -import "sync/atomic" +import ( + "sync/atomic" +) // FilterMetrics holds metrics for the filter policy. type FilterMetrics struct { @@ -31,6 +33,13 @@ type BlockHandle struct { Offset, Length uint64 } +// BlockHandleWithProperties is used for data blocks and first/lower level +// index blocks, since they can be annotated using BlockPropertyCollectors. +type BlockHandleWithProperties struct { + BlockHandle + Props []byte +} + type filterWriter interface { addKey(key []byte) finish() ([]byte, error) diff --git a/sstable/options.go b/sstable/options.go index 05805c678f..1d3bbf088c 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -199,6 +199,11 @@ type WriterOptions struct { // and lives for the lifetime of the table. TablePropertyCollectors []func() TablePropertyCollector + // BlockPropertyCollectors is a list of BlockPropertyCollector creation + // functions. A new BlockPropertyCollector is created for each sstable + // built and lives for the lifetime of writing that table. + BlockPropertyCollectors []func() BlockPropertyCollector + // Checksum specifies which checksum to use. Checksum ChecksumType } diff --git a/sstable/reader.go b/sstable/reader.go index fc060ea6f3..cd26ff7dc0 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -41,7 +41,11 @@ const ( // decodeBlockHandle returns the block handle encoded at the start of src, as // well as the number of bytes it occupies. It returns zero if given invalid -// input. +// input. A block handle for a data block or a first/lower level index block +// should not be decoded using decodeBlockHandle since the caller may validate +// that the number of bytes decoded is equal to the length of src, which will +// be false if the properties are not decoded. In those cases the caller +// should use decodeBlockHandleWithProperties. func decodeBlockHandle(src []byte) (BlockHandle, int) { offset, n := binary.Uvarint(src) length, m := binary.Uvarint(src[n:]) @@ -51,12 +55,33 @@ func decodeBlockHandle(src []byte) (BlockHandle, int) { return BlockHandle{offset, length}, n + m } +// decodeBlockHandleWithProperties returns the block handle and properties +// encoded in src. src needs to be exactly the length that was encoded. This +// method must be used for data block and first/lower level index blocks. The +// properties in the block handle point to the bytes in src. +func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) { + bh, n := decodeBlockHandle(src) + if n == 0 { + return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle") + } + return BlockHandleWithProperties{ + BlockHandle: bh, + Props: src[n:], + }, nil +} + func encodeBlockHandle(dst []byte, b BlockHandle) int { n := binary.PutUvarint(dst, b.Offset) m := binary.PutUvarint(dst[n:], b.Length) return n + m } +func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte { + n := encodeBlockHandle(dst, b.BlockHandle) + dst = append(dst[:n], b.Props...) + return dst +} + // block is a []byte that holds a sequence of key/value pairs plus an index // over those pairs. type block []byte @@ -76,6 +101,7 @@ type singleLevelIterator struct { // Global lower/upper bound for the iterator. lower []byte upper []byte + bpfs *BlockPropertiesFilterer // Per-block lower/upper bound. Nil if the bound does not apply to the block // because we determined the block lies completely within the bound. blockLower []byte @@ -84,6 +110,9 @@ type singleLevelIterator struct { index blockIter data blockIter dataRS readaheadState + // dataBH refers to the last data block that the iterator considered + // loading. It may not actually have loaded the block, due to an error or + // because it was considered irrelevant. dataBH BlockHandle err error closeHook func(i Iterator) error @@ -145,7 +174,23 @@ type singleLevelIterator struct { // not need to do anything. // // Similar examples can be constructed for backward iteration. - + // + // This notion of exactly one key before or after the bounds is not quite + // true when block properties are used to ignore blocks. In that case we + // can't stop precisely at the first block that is past the bounds since + // we are using the index entries to enforce the bounds. + // + // e.g. 3 blocks with keys [b, c] [f, g], [i, j, k] with index entries d, + // h, l. And let the lower bound be k, and we are reverse iterating. If + // the block [i, j, k] is ignored due to the block interval annotations we + // do need to move the index to block [f, g] since the index entry for the + // [i, j, k] block is l which is not less than the lower bound of k. So we + // have passed the entries i, j. + // + // This behavior is harmless since the block property filters are fixed + // for the lifetime of the iterator so i, j are irrelevant. In addition, + // the current code will not load the [f, g] block, so the seek + // optimization that attempts to use Next/Prev do not apply anyway. boundsCmp int positionedUsingLatestBounds bool @@ -206,7 +251,8 @@ func checkTwoLevelIterator(obj interface{}) { // init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. -func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *singleLevelIterator) init( + r *Reader, lower, upper []byte, filterer *BlockPropertiesFilterer) error { if r.err != nil { return r.err } @@ -217,6 +263,7 @@ func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = filterer i.reader = r i.cmp = r.Compare err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum) @@ -271,37 +318,55 @@ func (i *singleLevelIterator) initBounds() { } } +type loadBlockResult int8 +const ( + loadBlockOK loadBlockResult = iota + // Could be due to error or because no block left to load. + loadBlockFailed + loadBlockIrrelevant +) + // loadBlock loads the block at the current index position and leaves i.data // unpositioned. If unsuccessful, it sets i.err to any error encountered, which // may be nil if we have simply exhausted the entire table. -func (i *singleLevelIterator) loadBlock() bool { +func (i *singleLevelIterator) loadBlock() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the block // fails. i.data.invalidate() if !i.index.Valid() { - return false + return loadBlockFailed } // Load the next block. v := i.index.Value() - var n int - i.dataBH, n = decodeBlockHandle(v) - if n == 0 || n != len(v) { + bhp, err := decodeBlockHandleWithProperties(v) + i.dataBH = bhp.BlockHandle + if err != nil { i.err = errCorruptIndexEntry - return false + return loadBlockFailed + } + if i.bpfs != nil { + intersects, err := i.bpfs.intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } } block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataRS) if err != nil { i.err = err - return false + return loadBlockFailed } i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() - return false + return loadBlockFailed } i.initBounds() - return true + return loadBlockOK } func (i *singleLevelIterator) initBoundsForAlreadyLoadedBlock() { @@ -456,16 +521,31 @@ func (i *singleLevelIterator) seekGEHelper( } } // Slow-path. - if ikey, _ := i.index.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { // The target key is greater than any key in the sstable. Invalidate the // block iterator so that a subsequent call to Prev() will return the last // key in the table. i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next block if upper bound is already exceeded. Note that + // the next block starts with keys >= ikey.UserKey since even + // though this is the block separator, the same user key can span + // multiple blocks. Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + // Want to skip to the next block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekGE(key); ikey != nil { @@ -520,6 +600,9 @@ func (i *singleLevelIterator) seekPrefixGE( } i.lastBloomFilterMatched = true } + // The i.exhaustedBounds comparison indicates that the upper bound was + // reached. The i.data.isDataInvalidated() indicates that the sstable was + // exhausted. if trySeekUsingNext && (i.exhaustedBounds == +1 || i.data.isDataInvalidated()) { // Already exhausted, so return nil. return nil, nil @@ -568,12 +651,31 @@ func (i *singleLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { dontSeekWithinBlock = true } } else { - if ikey, _ := i.index.SeekGE(key); ikey == nil { - i.index.Last() + var ikey *InternalKey + if ikey, _ = i.index.SeekGE(key); ikey == nil { + ikey, _ = i.index.Last() + if ikey == nil { + return nil, nil + } } - if !i.loadBlock() { + // INVARIANT: ikey != nil. + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving + // to the previous block if lower bound is already exceeded. Note + // that the previous block starts with keys <= ikey.UserKey since + // even though this is the current block's separator, the same + // user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + // Want to skip to the previous block. + dontSeekWithinBlock = true + } } if !dontSeekWithinBlock { if ikey, val := i.data.SeekLT(key); ikey != nil { @@ -620,20 +722,38 @@ func (i *singleLevelIterator) firstInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.First(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.First(); ikey != nil { - if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + if result == loadBlockOK { + if ikey, val := i.data.First(); ikey != nil { + if i.blockUpper != nil && i.cmp(ikey.UserKey, i.blockUpper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next block if upper bound is + // already exceeded. Note that the next block starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple blocks. Since upper is exclusive we + // use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { i.exhaustedBounds = +1 return nil, nil } - return ikey, val + // Else fall through to skipForward. } + return i.skipForward() } @@ -659,20 +779,36 @@ func (i *singleLevelIterator) lastInternal() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.index.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.index.Last(); ikey == nil { i.data.invalidate() return nil, nil } - if !i.loadBlock() { + result := i.loadBlock() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.data.Last(); ikey != nil { - if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + if result == loadBlockOK { + if ikey, val := i.data.Last(); ikey != nil { + if i.blockLower != nil && i.cmp(ikey.UserKey, i.blockLower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here since + // don't want to bother moving to the previous block if lower bound is + // already exceeded. Note that the previous block starts with keys <= + // key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.lower) < 0 { i.exhaustedBounds = -1 return nil, nil } - return ikey, val } + return i.skipBackward() } @@ -726,14 +862,32 @@ func (i *singleLevelIterator) Prev() (*InternalKey, []byte) { func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { for { - if key, _ := i.index.Next(); key == nil { + var key *InternalKey + if key, _ = i.index.Next(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next block if upper + // bound is already exceeded. Note that the next block starts with + // keys >= key.UserKey since even though this is the block + // separator, the same user key can span multiple blocks. Since + // upper is exclusive we use >= below. + if i.upper != nil && i.cmp(key.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + return nil, nil + } continue } if key, val := i.data.First(); key != nil { @@ -749,14 +903,31 @@ func (i *singleLevelIterator) skipForward() (*InternalKey, []byte) { func (i *singleLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if key, _ := i.index.Prev(); key == nil { + var key *InternalKey + if key, _ = i.index.Prev(); key == nil { i.data.invalidate() break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } + if result == loadBlockFailed { + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + } + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous block if lower + // bound is already exceeded. Note that the previous block starts with + // keys <= key.UserKey since even though this is the current block's + // separator, the same user key can span multiple blocks. + if i.lower != nil && i.cmp(key.UserKey, i.lower) < 0 { + i.exhaustedBounds = -1 + return nil, nil + } continue } key, val := i.data.Last() @@ -772,14 +943,6 @@ func (i *singleLevelIterator) skipBackward() (*InternalKey, []byte) { return nil, nil } -// Returns true if the data block iterator points to a valid entry. If a -// positioning operation (e.g. SeekGE, SeekLT, Next, Prev, etc) returns (nil, -// nil) and valid() is true, the iterator has reached either the upper or lower -// bound. -func (i *singleLevelIterator) valid() bool { - return i.data.Valid() -} - // Error implements internalIterator.Error, as documented in the pebble // package. func (i *singleLevelIterator) Error() error { @@ -816,6 +979,9 @@ func (i *singleLevelIterator) Close() error { i.dataRS.sequentialFile = nil } err = firstError(err, i.err) + if i.bpfs != nil { + releaseBlockPropertiesFilterer(i.bpfs) + } *i = i.resetForReuse() singleLevelIterPool.Put(i) return err @@ -915,12 +1081,24 @@ func (i *compactionIterator) skipForward(key *InternalKey, val []byte) (*Interna if key, _ := i.index.Next(); key == nil { break } - if !i.loadBlock() { + result := i.loadBlock() + if result != loadBlockOK { if i.err != nil { break } - continue + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } } + // result == loadBlockOK if key, val = i.data.First(); key != nil { break } @@ -945,30 +1123,45 @@ var _ base.InternalIterator = (*twoLevelIterator)(nil) // leaves i.index unpositioned. If unsuccessful, it gets i.err to any error // encountered, which may be nil if we have simply exhausted the entire table. // This is used for two level indexes. -func (i *twoLevelIterator) loadIndex() bool { +func (i *twoLevelIterator) loadIndex() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the // index fails. i.data.invalidate() if !i.topLevelIndex.Valid() { i.index.offset = 0 i.index.restarts = 0 - return false + return loadBlockFailed } - h, n := decodeBlockHandle(i.topLevelIndex.Value()) - if n == 0 || n != len(i.topLevelIndex.Value()) { + bhp, err := decodeBlockHandleWithProperties(i.topLevelIndex.Value()) + if err != nil { i.err = base.CorruptionErrorf("pebble/table: corrupt top level index entry") - return false + return loadBlockFailed + } + if i.bpfs != nil { + intersects, err := i.bpfs.intersects(bhp.Props) + if err != nil { + i.err = errCorruptIndexEntry + return loadBlockFailed + } + if !intersects { + return loadBlockIrrelevant + } } - indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* readaheadState */) + indexBlock, err := i.reader.readBlock( + bhp.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { i.err = err - return false + return loadBlockFailed + } + if i.err = i.index.initHandle( + i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum); i.err == nil { + return loadBlockOK } - i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) - return i.err == nil + return loadBlockFailed } -func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { +func (i *twoLevelIterator) init( + r *Reader, lower, upper []byte, filterer *BlockPropertiesFilterer) error { if r.err != nil { return r.err } @@ -979,6 +1172,7 @@ func (i *twoLevelIterator) init(r *Reader, lower, upper []byte) error { i.lower = lower i.upper = upper + i.bpfs = filterer i.reader = r i.cmp = r.Compare err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum) @@ -1001,18 +1195,34 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { i.exhaustedBounds = 0 i.err = nil // clear cached iteration error + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1023,8 +1233,10 @@ func (i *twoLevelIterator) SeekGE(key []byte) (*InternalKey, []byte) { // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.SeekGE(key); ikey != nil { + return ikey, val + } } return i.skipForward() } @@ -1067,6 +1279,7 @@ func (i *twoLevelIterator) SeekPrefixGE( // Bloom filter matches. i.exhaustedBounds = 0 + var dontSeekWithinSingleLevelIter bool if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. @@ -1079,15 +1292,30 @@ func (i *twoLevelIterator) SeekPrefixGE( // block, and in that case we don't need to invalidate and reload the // singleLevelIterator state. trySeekUsingNext = false - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } + if result == loadBlockIrrelevant { + // Enforce the upper bound here since don't want to bother moving + // to the next entry in the top level index if upper bound is + // already exceeded. Note that the next entry starts with keys >= + // ikey.UserKey since even though this is the block separator, the + // same user key can span multiple index blocks. Since upper is + // exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } + // Fall through to skipForward. + dontSeekWithinSingleLevelIter = true + } } // Else fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -1098,20 +1326,16 @@ func (i *twoLevelIterator) SeekPrefixGE( // confirms that it is not behind. Since it is not ahead and not behind // it must be at the right position. - if ikey, val := i.singleLevelIterator.seekPrefixGE( - prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { - return ikey, val + if !dontSeekWithinSingleLevelIter { + if ikey, val := i.singleLevelIterator.seekPrefixGE( + prefix, key, trySeekUsingNext, false /* checkFilter */); ikey != nil { + return ikey, val + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } -func (i *twoLevelIterator) SeekPrefixGEWithExhaustionIndicator( - prefix, key []byte, trySeekUsingNext bool, -) (k *base.InternalKey, value []byte, iterExhaustedAndNotBloomFilterFail bool) { - k, value = i.SeekPrefixGE(prefix, key, trySeekUsingNext) - return k, value, false -} - // SeekLT implements internalIterator.SeekLT, as documented in the pebble // package. Note that SeekLT only checks the lower bound. It is up to the // caller to ensure that key is less than the upper bound. @@ -1121,31 +1345,58 @@ func (i *twoLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 + var result loadBlockResult + var ikey *InternalKey // NB: Unlike SeekGE, we don't have a fast-path here since we don't know // whether the topLevelIndex is positioned after the position that would // be returned by doing i.topLevelIndex.SeekGE(). To know this we would // need to know the index key preceding the current one. - if ikey, _ := i.topLevelIndex.SeekGE(key); ikey == nil { - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + if ikey, _ = i.topLevelIndex.SeekGE(key); ikey == nil { + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result = i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - return i.singleLevelIterator.lastInternal() - } - - if !i.loadIndex() { - return nil, nil - } - - if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraints, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } else { + result = i.loadIndex() + if result == loadBlockFailed { + return nil, nil + } + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.SeekLT(key); ikey != nil { + return ikey, val + } + // Fall through to skipBackward since the singleLevelIterator did + // not have any blocks that satisfy the block interval + // constraint, or the lower bound was reached. + } + // Else loadBlockIrrelevant, so fall through. + } + if result == loadBlockIrrelevant { + // Enforce the lower bound here since don't want to bother moving to + // the previous entry in the top level index if lower bound is already + // exceeded. Note that the previous entry starts with keys <= + // ikey.UserKey since even though this is the current block's + // separator, the same user key can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1162,17 +1413,32 @@ func (i *twoLevelIterator) First() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.First(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.First(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.First(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.First(); ikey != nil { + return ikey, val + } + // Else fall through to skipForward. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here since + // don't want to bother moving to the next entry in the top level + // index if upper bound is already exceeded. Note that the next entry + // starts with keys >= ikey.UserKey since even though this is the + // block separator, the same user key can span multiple index blocks. + // Since upper is exclusive we use >= below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + } } + // NB: skipForward checks whether exhaustedBounds is already +1. return i.skipForward() } @@ -1189,17 +1455,32 @@ func (i *twoLevelIterator) Last() (*InternalKey, []byte) { // Seek optimization only applies until iterator is first positioned after SetBounds. i.boundsCmp = 0 - if ikey, _ := i.topLevelIndex.Last(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Last(); ikey == nil { return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - - if ikey, val := i.singleLevelIterator.Last(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.Last(); ikey != nil { + return ikey, val + } + // Else fall through to skipBackward. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + } } + // NB: skipBackward checks whether exhaustedBounds is already -1. return i.skipBackward() } @@ -1235,51 +1516,75 @@ func (i *twoLevelIterator) Prev() (*InternalKey, []byte) { func (i *twoLevelIterator) skipForward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the upper bound. - // + if i.err != nil || i.exhaustedBounds > 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Next(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Next(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.firstInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = +1. + } else { + // result == loadBlockIrrelevant. Enforce the upper bound here + // since don't want to bother moving to the next entry in the top + // level index if upper bound is already exceeded. Note that the + // next entry starts with keys >= ikey.UserKey since even though + // this is the block separator, the same user key can span + // multiple index blocks. Since upper is exclusive we use >= + // below. + if i.upper != nil && i.cmp(ikey.UserKey, i.upper) >= 0 { + i.exhaustedBounds = +1 + // Next iteration will return. + } } } } func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { for { - if i.err != nil { - return nil, nil - } - if i.singleLevelIterator.valid() { - // The iterator is positioned at valid record in the current data block - // which implies the previous positioning call reached the lower bound. + if i.err != nil || i.exhaustedBounds < 0 { return nil, nil } i.exhaustedBounds = 0 - if ikey, _ := i.topLevelIndex.Prev(); ikey == nil { + var ikey *InternalKey + if ikey, _ = i.topLevelIndex.Prev(); ikey == nil { i.data.invalidate() i.index.invalidate() return nil, nil } - if !i.loadIndex() { + result := i.loadIndex() + if result == loadBlockFailed { return nil, nil } - if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { - return ikey, val + if result == loadBlockOK { + if ikey, val := i.singleLevelIterator.lastInternal(); ikey != nil { + return ikey, val + } + // Next iteration will return if singleLevelIterator set + // exhaustedBounds = -1. + } else { + // result == loadBlockIrrelevant. Enforce the lower bound here + // since don't want to bother moving to the previous entry in the + // top level index if lower bound is already exceeded. Note that + // the previous entry starts with keys <= ikey.UserKey since even + // though this is the current block's separator, the same user key + // can span multiple index blocks. + if i.lower != nil && i.cmp(ikey.UserKey, i.upper) < 0 { + i.exhaustedBounds = -1 + // Next iteration will return. + } } } } @@ -1299,6 +1604,9 @@ func (i *twoLevelIterator) Close() error { i.dataRS.sequentialFile = nil } err = firstError(err, i.err) + if i.bpfs != nil { + releaseBlockPropertiesFilterer(i.bpfs) + } *i = twoLevelIterator{ singleLevelIterator: i.singleLevelIterator.resetForReuse(), topLevelIndex: i.topLevelIndex.resetForReuse(), @@ -1370,10 +1678,26 @@ func (i *twoLevelCompactionIterator) skipForward( if key, _ := i.topLevelIndex.Next(); key == nil { break } - if i.loadIndex() { - if key, val = i.singleLevelIterator.First(); key != nil { + result := i.loadIndex() + if result != loadBlockOK { + if i.err != nil { break } + switch result { + case loadBlockFailed: + // We checked that i.index was at a valid entry, so + // loadBlockFailed could not have happened due to to i.index + // being exhausted, and must be due to an error. + panic("loadBlock should not have failed with no error") + case loadBlockIrrelevant: + panic("compactionIter should not be using block intervals for skipping") + default: + panic(fmt.Sprintf("unexpected case %d", result)) + } + } + // result == loadBlockOK + if key, val = i.singleLevelIterator.First(); key != nil { + break } } } @@ -1777,15 +2101,17 @@ func (r *Reader) get(key []byte) (value []byte, err error) { return newValue, nil } -// NewIter returns an iterator for the contents of the table. If an error -// occurs, NewIter cleans up after itself and returns a nil iterator. -func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { +// NewIterWithBlockPropertyFilters returns an iterator for the contents of the +// table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after +// itself and returns a nil iterator. +func (r *Reader) NewIterWithBlockPropertyFilters( + lower, upper []byte, filterer *BlockPropertiesFilterer) (Iterator, error) { // NB: pebble.tableCache wraps the returned iterator with one which performs // reference counting on the Reader, preventing the Reader from being closed // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, filterer) if err != nil { return nil, err } @@ -1793,20 +2119,26 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, lower, upper) + err := i.init(r, lower, upper, filterer) if err != nil { return nil, err } return i, nil } +// NewIter returns an iterator for the contents of the table. If an error +// occurs, NewIter cleans up after itself and returns a nil iterator. +func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { + return r.NewIterWithBlockPropertyFilters(lower, upper, nil) +} + // NewCompactionIter returns an iterator similar to NewIter but it also increments // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up // after itself and returns a nil iterator. func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -1817,7 +2149,7 @@ func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { }, nil } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */) + err := i.init(r, nil /* lower */, nil /* upper */, nil) if err != nil { return nil, err } @@ -2116,33 +2448,34 @@ func (r *Reader) Layout() (*Layout, error) { l.Index = append(l.Index, r.indexBH) iter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } } else { l.TopIndex = r.indexBH topIter, _ := newBlockIter(r.Compare, indexH.Get()) for key, value := topIter.First(); key != nil; key, value = topIter.Next() { - indexBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + indexBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Index = append(l.Index, indexBH) + l.Index = append(l.Index, indexBH.BlockHandle) - subIndex, err := r.readBlock(indexBH, nil /* transform */, nil /* readaheadState */) + subIndex, err := r.readBlock( + indexBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return nil, err } iter, _ := newBlockIter(r.Compare, subIndex.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - dataBH, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + dataBH, err := decodeBlockHandleWithProperties(value) + if err != nil { return nil, errCorruptIndexEntry } - l.Data = append(l.Data, dataBH) + l.Data = append(l.Data, dataBH.BlockHandle) } subIndex.Release() } @@ -2237,11 +2570,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, topIter.Error() } - startIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* readaheadState */) + startIdxBlock, err := r.readBlock( + startIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2257,11 +2591,12 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } } else { - endIdxBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endIdxBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* readaheadState */) + endIdxBlock, err := r.readBlock( + endIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */) if err != nil { return 0, err } @@ -2280,8 +2615,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range falls completely after this file, or an error occurred. return 0, startIdxIter.Error() } - startBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + startBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } @@ -2297,8 +2632,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // The range spans beyond this file. Include data blocks through the last. return r.Properties.DataSize - startBH.Offset, nil } - endBH, n := decodeBlockHandle(val) - if n == 0 || n != len(val) { + endBH, err := decodeBlockHandleWithProperties(val) + if err != nil { return 0, errCorruptIndexEntry } return endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset, nil @@ -2584,8 +2919,8 @@ func (l *Layout) Describe( case "index", "top-index": iter, _ := newBlockIter(r.Compare, h.Get()) for key, value := iter.First(); key != nil; key, value = iter.Next() { - bh, n := decodeBlockHandle(value) - if n == 0 || n != len(value) { + bh, err := decodeBlockHandleWithProperties(value) + if err != nil { fmt.Fprintf(w, "%10d [err: %s]\n", b.Offset+uint64(iter.offset), err) continue } diff --git a/sstable/table.go b/sstable/table.go index 47416fbeb3..94da13d59f 100644 --- a/sstable/table.go +++ b/sstable/table.go @@ -124,20 +124,29 @@ is a key that is >= every key in block i and is < every key i block i+1. The successor for the final block is a key that is >= every key in block N-1. The index block restart interval is 1: every entry is a restart point. -A block handle is an offset and a length; the length does not include the 5 -byte trailer. Both numbers are varint-encoded, with no padding between the two -values. The maximum size of an encoded block handle is therefore 20 bytes. + +A block handle is an offset, a length, and optional block properties (for data +blocks and first/lower level index blocks); the length does not include the 5 +byte trailer. All numbers are varint-encoded, with no padding between the two +values. The maximum size of an encoded block handle without properties is 20 +bytes. It is not advised to have properties that accumulate to be longer than +100 bytes. + */ const ( - blockTrailerLen = 5 - blockHandleMaxLen = 10 + 10 + blockTrailerLen = 5 + blockHandleMaxLenWithoutProperties = 10 + 10 + // blockHandleLikelyMaxLen can be used for pre-allocating buffers to + // reduce memory copies. It is not guaranteed that a block handle will not + // exceed this length. + blockHandleLikelyMaxLen = blockHandleMaxLenWithoutProperties + 100 levelDBFooterLen = 48 levelDBMagic = "\x57\xfb\x80\x8b\x24\x75\x47\xdb" levelDBMagicOffset = levelDBFooterLen - len(levelDBMagic) - rocksDBFooterLen = 1 + 2*blockHandleMaxLen + 4 + 8 + rocksDBFooterLen = 1 + 2*blockHandleMaxLenWithoutProperties + 4 + 8 rocksDBMagic = "\xf7\xcf\xf4\x85\xb7\x41\xe2\x88" rocksDBMagicOffset = rocksDBFooterLen - len(rocksDBMagic) rocksDBVersionOffset = rocksDBMagicOffset - 4 diff --git a/sstable/writer.go b/sstable/writer.go index 8deb34e4ab..359fe47397 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -128,12 +128,14 @@ type Writer struct { // Internal flag to allow creation of range-del-v1 format blocks. Only used // for testing. Note that v2 format blocks are backwards compatible with v1 // format blocks. - rangeDelV1Format bool - block blockWriter - indexBlock blockWriter - rangeDelBlock blockWriter - props Properties - propCollectors []TablePropertyCollector + rangeDelV1Format bool + block blockWriter + indexBlock blockWriter + rangeDelBlock blockWriter + props Properties + propCollectors []TablePropertyCollector + blockPropCollectors []BlockPropertyCollector + blockPropsEncoder blockPropertiesEncoder // compressedBuf is the destination buffer for compression. It is // re-used over the lifetime of the writer, avoiding the allocation of a // temporary buffer for each block. @@ -143,13 +145,19 @@ type Writer struct { // nil, or the full keys otherwise. filter filterWriter // tmp is a scratch buffer, large enough to hold either footerLen bytes, - // blockTrailerLen bytes, or (5 * binary.MaxVarintLen64) bytes. - tmp [rocksDBFooterLen]byte + // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most + // likely large enough for a block handle with properties. + tmp [blockHandleLikelyMaxLen]byte xxHasher *xxhash.Digest topLevelIndexBlock blockWriter - indexPartitions []blockWriter + indexPartitions []indexBlockWriterAndBlockProperties +} + +type indexBlockWriterAndBlockProperties struct { + writer blockWriter + properties []byte } // Set sets the value for the given key. The sequence number is set to @@ -241,6 +249,11 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { return err } } + for i := range w.blockPropCollectors { + if err := w.blockPropCollectors[i].Add(key, value); err != nil { + return err + } + } w.maybeAddToFilter(key.UserKey) w.block.add(key, value) @@ -376,16 +389,44 @@ func (w *Writer) maybeFlush(key InternalKey, value []byte) error { w.err = err return w.err } - w.addIndexEntry(key, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(key, bhp); err != nil { + return err + } return nil } +// The BlockHandleWithProperties returned by this method must be encoded +// before any future use of the Writer.blockPropsEncoder, since the properties +// slice will get reused by the blockPropsEncoder. +func (w *Writer) maybeAddBlockPropertiesToBlockHandle( + bh BlockHandle) (BlockHandleWithProperties, error) { + if len(w.blockPropCollectors) == 0 { + return BlockHandleWithProperties{BlockHandle: bh}, nil + } + var err error + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil { + return BlockHandleWithProperties{}, nil + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(shortID(i), scratch) + } + } + return BlockHandleWithProperties{BlockHandle: bh, Props: w.blockPropsEncoder.unsafeProps()}, nil +} + // addIndexEntry adds an index entry for the specified key and block handle. -func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { - if bh.Length == 0 { +func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) error { + if bhp.Length == 0 { // A valid blockHandle must be non-zero. // In particular, it must have a non-zero length. - return + return nil } prevKey := base.DecodeInternalKey(w.block.curKey) var sep InternalKey @@ -394,16 +435,22 @@ func (w *Writer) addIndexEntry(key InternalKey, bh BlockHandle) { } else { sep = prevKey.Separator(w.compare, w.separator, nil, key) } - n := encodeBlockHandle(w.tmp[:], bh) + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) if supportsTwoLevelIndex(w.tableFormat) && - shouldFlush(sep, w.tmp[:n], &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { + shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { // Enable two level indexes if there is more than one index block. w.twoLevelIndex = true - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return err + } } - w.indexBlock.add(sep, w.tmp[:n]) + for i := range w.blockPropCollectors { + w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock() + } + w.indexBlock.add(sep, encoded) + return nil } func shouldFlush( @@ -438,29 +485,51 @@ func shouldFlush( // finishIndexBlock finishes the current index block and adds it to the top // level index block. This is only used when two level indexes are enabled. -func (w *Writer) finishIndexBlock() { - w.indexPartitions = append(w.indexPartitions, w.indexBlock) +func (w *Writer) finishIndexBlock() error { + w.blockPropsEncoder.resetProps() + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + var err error + if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil { + return err + } + if len(scratch) > 0 { + w.blockPropsEncoder.addProp(shortID(i), scratch) + } + } + w.indexPartitions = append(w.indexPartitions, + indexBlockWriterAndBlockProperties{ + writer: w.indexBlock, + properties: w.blockPropsEncoder.props(), + }) w.indexBlock = blockWriter{ restartInterval: 1, } + return nil } func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { // Add the final unfinished index. - w.finishIndexBlock() + if err := w.finishIndexBlock(); err != nil { + return BlockHandle{}, err + } for i := range w.indexPartitions { b := &w.indexPartitions[i] - w.props.NumDataBlocks += uint64(b.nEntries) - sep := base.DecodeInternalKey(b.curKey) - data := b.finish() + w.props.NumDataBlocks += uint64(b.writer.nEntries) + sep := base.DecodeInternalKey(b.writer.curKey) + data := b.writer.finish() w.props.IndexSize += uint64(len(data)) bh, err := w.writeBlock(data, w.compression) if err != nil { return BlockHandle{}, err } - n := encodeBlockHandle(w.tmp[:], bh) - w.topLevelIndexBlock.add(sep, w.tmp[:n]) + bhp := BlockHandleWithProperties{ + BlockHandle: bh, + Props: b.properties, + } + encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) + w.topLevelIndexBlock.add(sep, encoded) } // NB: RocksDB includes the block trailer length in the index size @@ -506,7 +575,7 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) } binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) - bh := BlockHandle{w.meta.Size, uint64(len(b))} + bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(b))} if w.cacheID != 0 && w.fileNum != 0 { // Remove the block being written from the cache. This provides defense in @@ -557,7 +626,13 @@ func (w *Writer) Close() (err error) { w.err = err return w.err } - w.addIndexEntry(InternalKey{}, bh) + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + if err = w.addIndexEntry(InternalKey{}, bhp); err != nil { + return err + } } w.props.DataSize = w.meta.Size @@ -638,6 +713,24 @@ func (w *Writer) Close() (err error) { return err } } + for i := range w.blockPropCollectors { + scratch := w.blockPropsEncoder.getScratchForProp() + // Place the shortID in the first byte. + scratch = append(scratch, byte(i)) + buf, err := + w.blockPropCollectors[i].FinishTable(scratch) + if err != nil { + return err + } + var prop string + if len(buf) > 0 { + prop = string(buf) + } + // NB: The property is populated in the map even if it is the + // empty string, since the presence in the map is what indicates + // that the block property collector was used when writing. + userProps[w.blockPropCollectors[i].Name()] = prop + } if len(userProps) > 0 { w.props.UserProperties = userProps } @@ -823,16 +916,36 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.props.PropertyCollectorNames = "[]" w.props.ExternalFormatVersion = rocksDBExternalFormatVersion - if len(o.TablePropertyCollectors) > 0 { - w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + if len(o.TablePropertyCollectors) > 0 || len(o.BlockPropertyCollectors) > 0 { var buf bytes.Buffer buf.WriteString("[") - for i := range o.TablePropertyCollectors { - w.propCollectors[i] = o.TablePropertyCollectors[i]() - if i > 0 { - buf.WriteString(",") + if len(o.TablePropertyCollectors) > 0 { + w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) + for i := range o.TablePropertyCollectors { + w.propCollectors[i] = o.TablePropertyCollectors[i]() + if i > 0 { + buf.WriteString(",") + } + buf.WriteString(w.propCollectors[i].Name()) + } + } + if len(o.BlockPropertyCollectors) > 0 { + // shortID is a uint8, so we cannot exceed that number of block + // property collectors. + if len(o.BlockPropertyCollectors) > math.MaxUint8 { + w.err = errors.New("pebble: too many block property collectors") + return w + } + // The shortID assigned to a collector is the same as its index in + // this slice. + w.blockPropCollectors = make([]BlockPropertyCollector, len(o.BlockPropertyCollectors)) + for i := range o.BlockPropertyCollectors { + w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]() + if i > 0 || len(o.TablePropertyCollectors) > 0 { + buf.WriteString(",") + } + buf.WriteString(w.blockPropCollectors[i].Name()) } - buf.WriteString(w.propCollectors[i].Name()) } buf.WriteString("]") w.props.PropertyCollectorNames = buf.String() diff --git a/table_cache.go b/table_cache.go index bbc6fb3464..bd35e68de2 100644 --- a/table_cache.go +++ b/table_cache.go @@ -318,13 +318,33 @@ func (c *tableCacheShard) newIters( c.unrefValue(v) return emptyIter, nil, nil } + var bpfs []BlockPropertyFilter + if opts != nil { + bpfs = opts.BlockPropertyFilters + } + var filterer *sstable.BlockPropertiesFilterer + if len(bpfs) > 0 { + filterer = sstable.NewBlockPropertiesFilterer(bpfs) + intersects, err := + filterer.IntersectsUserPropsAndFinishInit(v.reader.Properties.UserProperties) + if err != nil { + return nil, nil, err + } + if !intersects { + // Return the empty iterator. This iterator has no mutable state, so + // using a singleton is fine. + c.unrefValue(v) + return emptyIter, nil, nil + } + } var iter sstable.Iterator var err error if bytesIterated != nil { iter, err = v.reader.NewCompactionIter(bytesIterated) } else { - iter, err = v.reader.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) + iter, err = v.reader.NewIterWithBlockPropertyFilters( + opts.GetLowerBound(), opts.GetUpperBound(), filterer) } if err != nil { c.unrefValue(v) diff --git a/testdata/checkpoint b/testdata/checkpoint index 66662f14db..b75fddc366 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -28,6 +28,9 @@ sync: db create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -93,9 +96,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.004 -sync: checkpoints/checkpoint1/marker.format-version.000001.004 -close: checkpoints/checkpoint1/marker.format-version.000001.004 +create: checkpoints/checkpoint1/marker.format-version.000001.005 +sync: checkpoints/checkpoint1/marker.format-version.000001.005 +close: checkpoints/checkpoint1/marker.format-version.000001.005 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -150,7 +153,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000003.004 +marker.format-version.000004.005 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -160,7 +163,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.004 +marker.format-version.000001.005 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/event_listener b/testdata/event_listener index 412ba1a19c..94d0de7f5f 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -34,6 +34,10 @@ create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 sync: db upgraded to format version: 004 +create: db/marker.format-version.000004.005 +close: db/marker.format-version.000004.005 +sync: db +upgraded to format version: 005 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -200,9 +204,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.004 -sync: checkpoint/marker.format-version.000001.004 -close: checkpoint/marker.format-version.000001.004 +create: checkpoint/marker.format-version.000001.005 +sync: checkpoint/marker.format-version.000001.005 +close: checkpoint/marker.format-version.000001.005 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/iterator_block_interval_filter b/testdata/iterator_block_interval_filter new file mode 100644 index 0000000000..46fcd4bb12 --- /dev/null +++ b/testdata/iterator_block_interval_filter @@ -0,0 +1,275 @@ +# Block size is 1, so each block contains one key, and two level index is used +# since the lower index blocks have only one key. + +# Build a table with a single interval collector with id=2 and 2 character +# suffix. The keys in the table are in the interval [1,7). +build id_offset=(2,0) +set a01 a +set b02 b +set c03 c +set d04 d +set e05 e +set f06 f +---- +0.0: + 000005:[a01#1,SET-f06#6,SET] + +# Iterate without a filter. +iter +first +next +next +next +next +next +next +---- +a01:a +b02:b +c03:c +d04:d +e05:e +f06:f +. + +# Iterate with a filter interval [1,2) that selects a key at the beginning of +# the file. +iter id_lower_upper=(2,1,2) +first +next +prev +prev +next +next +last +seek-lt f +seek-ge a +seek-ge b +prev +---- +a01:a +. +a01:a +. +a01:a +. +a01:a +a01:a +a01:a +. +a01:a + +# Iterate with a filter interval [3,5) that selects keys in the middle of the +# file. +iter id_lower_upper=(2,3,5) +first +next +next +last +prev +prev +seek-lt f +prev +next +prev +prev +last +seek-ge c +seek-ge d +next +prev +prev +prev +---- +c03:c +d04:d +. +d04:d +c03:c +. +d04:d +c03:c +d04:d +c03:c +. +d04:d +c03:c +d04:d +. +d04:d +c03:c +. + +# Iterate with a filter interval [6,8) that selects a key at the end of the +# file. +iter id_lower_upper=(2,6,8) +first +next +prev +prev +next +last +prev +seek-lt g +seek-ge b +---- +f06:f +. +f06:f +. +f06:f +f06:f +. +f06:f +f06:f + +iter id_lower_upper=(2,2,2) +first +last +seek-ge a +seek-lt g +---- +. +. +. +. + +# Iterate with a filter interval [7,9) that is after the interval in the file. +iter id_lower_upper=(2, 7, 9) +first +last +seek-ge a +seek-lt g +---- +. +. +. +. + +# Iterate with a filter interval [0,1) that is before the interval in the +# file. +iter id_lower_upper=(2, 0, 1) +first +last +seek-ge a +seek-lt g +---- +. +. +. +. + +# Iterate with a filter id=3, which is unknown to the file, so all blocks are +# visible. +iter id_lower_upper=(3, 1, 2) +first +next +next +next +next +next +next +---- +a01:a +b02:b +c03:c +d04:d +e05:e +f06:f +. + +# Build a table with two interval collectors: +# - id=3 and 2 character suffix. The keys in the table are in the interval +# [1,6). +# - id=5 and 2 characters offset by 2 from the suffix. The keys in the table +# are in the interval [6,11). +build id_offset=(3,0) id_offset=(5,2) +set a1001 a +set b0902 b +set c0803 c +set d0704 d +set e0605 e +---- +0.0: + 000005:[a1001#1,SET-e0605#5,SET] + +# Iterate without a filter. +iter +first +next +next +next +next +next +---- +a1001:a +b0902:b +c0803:c +d0704:d +e0605:e +. + +# Iterate with filter id=5, interval [7,9). +iter id_lower_upper=(5,7,9) +first +next +next +prev +prev +---- +c0803:c +d0704:d +. +d0704:d +c0803:c + +# Iterate with filter id=5, interval [7,9), and an unknown filter id. The +# result should only be affected by the filter id=5. +iter id_lower_upper=(5,7,9) id_lower_upper=(10,0,1) +first +next +next +prev +prev +---- +c0803:c +d0704:d +. +d0704:d +c0803:c + +# Iterate with filter id=3, interval [4,5) and filter id=5, interval [7,9). +# The set of blocks admitted by these two filters are intersecting, but not +# identical. Key c0803, which is allowed by the latter is not allowed by the +# former, and hence omitted. +iter id_lower_upper=(3,4,5) id_lower_upper=(5,7,9) +first +next +prev +prev +---- +d0704:d +. +d0704:d +. + +# Iterate with filter id=3 and id=5, where the two admitted sets are +# non-empty, but the intersection is empty. +iter id_lower_upper=(3,4,5) id_lower_upper=(5,8,9) +first +---- +. + +# Iterate with filter id=3 and id=5, where filter id=5 set is empty, so the +# intersection is empty. +iter id_lower_upper=(3,4,5) id_lower_upper=(5,11,12) +first +---- +. + +# Iterate with filter id=3 and id=5, where filter id=3 set is empty, so the +# intersection is empty. +iter id_lower_upper=(3,6,7) id_lower_upper=(5,7,9) +first +---- +.