Skip to content

Commit

Permalink
sstable: add range key support to BlockIntervalCollector
Browse files Browse the repository at this point in the history
Currently, the `sstable.BlockIntervalCollector` exists as a helper
struct to which an implementation of
`sstable.DataBlockIntervalCollector` is passed. The latter contains the
user-defined logic for converting a key and value in an associated
interval that can be maintained.

The `BlockIntervalCollector` struct handles the logic required to
implement `sstable.BlockPropertyCollector`, encoding the interval as a
property when the data block is completed, and maintaining index-block-
and table-level intervals.

Update `sstable.Writer` to pass through range keys to each block
property collector as they are added to the writer.

As range and point keys can be thought of as existing in separate
keyspaces within the same LSM, range keys only contribute to table level
intervals (unioned with the point key interval), rather than block- and
index-level intervals.

Another way to rationalize this decision is to consider that range keys
are contained in a single, dedicated block in the SSTable, while the
point keys can span multiple blocks and have an associated index entry
into which the properties for the specific block are encoded. Block
level filtering applies to point keys only, whereas table-level
filtering takes the union of the point and range key intervals for the
table.

One downside of taking the union of the range and point keys for the
table level interval is an increased rate of false positives when
filtering tables based on an interval and the range key interval is
wider than the point key interval.

This change alters the `NewBlockIntervalCollector` function to take in
two `DataBlockIntervalCollector`s, one for point and range keys. This is
required to track the intervals separately within the
`BlockIntervalCollector`. The caller has the flexibility of passing in
`nil` for one (or both) of the collectors, in which case either point or
range keys (or both) will be ignored by the collector. This can be used,
for example, to construct a collectors that apply exclusively to either
point or range keys.
  • Loading branch information
nicktrav committed Jan 18, 2022
1 parent d4f0a8d commit 136b1e0
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 41 deletions.
16 changes: 9 additions & 7 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,9 @@ func TestIteratorBlockIntervalFilter(t *testing.T) {
bpCollectors = append(bpCollectors, func() BlockPropertyCollector {
return sstable.NewBlockIntervalCollector(
fmt.Sprintf("%d", coll.id),
&testBlockIntervalCollector{numLength: 2, offsetFromEnd: coll.offset})
&testBlockIntervalCollector{numLength: 2, offsetFromEnd: coll.offset},
nil, /* range key collector */
)
})
}
opts := &Options{
Expand Down Expand Up @@ -1344,9 +1346,9 @@ func TestIteratorRandomizedBlockIntervalFilter(t *testing.T) {
FormatMajorVersion: FormatNewest,
BlockPropertyCollectors: []func() BlockPropertyCollector{
func() BlockPropertyCollector {
return sstable.NewBlockIntervalCollector("0", &testBlockIntervalCollector{
numLength: 2,
})
return sstable.NewBlockIntervalCollector(
"0", &testBlockIntervalCollector{numLength: 2}, nil, /* range key collector */
)
},
},
}
Expand Down Expand Up @@ -1657,9 +1659,9 @@ func BenchmarkBlockPropertyFilter(b *testing.B) {
FormatMajorVersion: FormatNewest,
BlockPropertyCollectors: []func() BlockPropertyCollector{
func() BlockPropertyCollector {
return sstable.NewBlockIntervalCollector("0", &testBlockIntervalCollector{
numLength: 3,
})
return sstable.NewBlockIntervalCollector(
"0", &testBlockIntervalCollector{numLength: 3}, nil, /* range key collector */
)
},
},
}
Expand Down
40 changes: 34 additions & 6 deletions sstable/block_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/rangekey"
)

// Block properties are an optional user-facing feature that can be used to
Expand Down Expand Up @@ -102,8 +103,9 @@ type BlockPropertyFilter interface {
// Users must not expect this to preserve differences between empty sets --
// they will all get turned into the semantically equivalent [0,0).
type BlockIntervalCollector struct {
name string
dbic DataBlockIntervalCollector
name string
points DataBlockIntervalCollector
ranges DataBlockIntervalCollector

blockInterval interval
indexInterval interval
Expand All @@ -129,9 +131,14 @@ type DataBlockIntervalCollector interface {
// NewBlockIntervalCollector constructs a BlockIntervalCollector, with the
// given name and data block collector.
func NewBlockIntervalCollector(
name string, blockAttributeCollector DataBlockIntervalCollector) *BlockIntervalCollector {
name string,
pointCollector, rangeCollector DataBlockIntervalCollector,
) *BlockIntervalCollector {
return &BlockIntervalCollector{
name: name, dbic: blockAttributeCollector}
name: name,
points: pointCollector,
ranges: rangeCollector,
}
}

// Name implements the BlockPropertyCollector interface.
Expand All @@ -141,13 +148,23 @@ func (b *BlockIntervalCollector) Name() string {

// Add implements the BlockPropertyCollector interface.
func (b *BlockIntervalCollector) Add(key InternalKey, value []byte) error {
return b.dbic.Add(key, value)
if rangekey.IsRangeKey(key.Kind()) {
if b.ranges != nil {
return b.ranges.Add(key, value)
}
} else if b.points != nil {
return b.points.Add(key, value)
}
return nil
}

// FinishDataBlock implements the BlockPropertyCollector interface.
func (b *BlockIntervalCollector) FinishDataBlock(buf []byte) ([]byte, error) {
if b.points == nil {
return buf, nil
}
var err error
b.blockInterval.lower, b.blockInterval.upper, err = b.dbic.FinishDataBlock()
b.blockInterval.lower, b.blockInterval.upper, err = b.points.FinishDataBlock()
if err != nil {
return buf, err
}
Expand All @@ -172,6 +189,17 @@ func (b *BlockIntervalCollector) FinishIndexBlock(buf []byte) ([]byte, error) {

// FinishTable implements the BlockPropertyCollector interface.
func (b *BlockIntervalCollector) FinishTable(buf []byte) ([]byte, error) {
// If the collector is tracking range keys, the range key interval is union-ed
// with the point key interval for the table.
if b.ranges != nil {
var rangeInterval interval
var err error
rangeInterval.lower, rangeInterval.upper, err = b.ranges.FinishDataBlock()
if err != nil {
return buf, err
}
b.tableInterval.union(rangeInterval)
}
return b.tableInterval.encode(buf), nil
}

Expand Down
99 changes: 72 additions & 27 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -176,20 +177,20 @@ func (c *testDataBlockIntervalCollector) FinishDataBlock() (lower uint64, upper
}

func TestBlockIntervalCollector(t *testing.T) {
var dbic testDataBlockIntervalCollector
bic := NewBlockIntervalCollector("foo", &dbic)
var points, ranges testDataBlockIntervalCollector
bic := NewBlockIntervalCollector("foo", &points, &ranges)
require.Equal(t, "foo", bic.Name())
// Single key to call Add once. The real action is in the other methods.
key := InternalKey{UserKey: []byte("a")}
require.NoError(t, bic.Add(key, nil))
dbic.i = interval{1, 1}
// First data block has empty interval.
// Set up the point key collector with an initial (empty) interval.
points.i = interval{1, 1}
// First data block has empty point key interval.
encoded, err := bic.FinishDataBlock(nil)
require.NoError(t, err)
require.True(t, bytes.Equal(nil, encoded))
bic.AddPrevDataBlockToIndexBlock()
// Second data block.
dbic.i = interval{20, 25}
// Second data block contains a point and range key interval. The latter
// should not contribute to the block interval.
points.i = interval{20, 25}
ranges.i = interval{5, 150}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
var decoded interval
Expand All @@ -202,14 +203,14 @@ func TestBlockIntervalCollector(t *testing.T) {
require.True(t, bytes.Equal(nil, encodedIndexBlock))
bic.AddPrevDataBlockToIndexBlock()
// Third data block.
dbic.i = interval{10, 15}
points.i = interval{10, 15}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encoded))
require.Equal(t, interval{10, 15}, decoded)
bic.AddPrevDataBlockToIndexBlock()
// Fourth data block.
dbic.i = interval{100, 105}
points.i = interval{100, 105}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encoded))
Expand All @@ -226,11 +227,12 @@ func TestBlockIntervalCollector(t *testing.T) {
require.NoError(t, decoded.decode(encodedIndexBlock))
require.Equal(t, interval{100, 105}, decoded)
var encodedTable []byte
// Finish table.
// Finish table. The table interval is the union of the current point key
// table interval [10, 105) and the range key interval [5, 150).
encodedTable, err = bic.FinishTable(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encodedTable))
require.Equal(t, interval{10, 105}, decoded)
require.Equal(t, interval{5, 150}, decoded)
}

func TestBlockIntervalFilter(t *testing.T) {
Expand Down Expand Up @@ -261,11 +263,11 @@ func TestBlockIntervalFilter(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var dbic testDataBlockIntervalCollector
var points testDataBlockIntervalCollector
name := "foo"
bic := NewBlockIntervalCollector(name, &dbic)
bic := NewBlockIntervalCollector(name, &points, nil)
bif := NewBlockIntervalFilter(name, tc.filter.lower, tc.filter.upper)
dbic.i = tc.prop
points.i = tc.prop
prop, _ := bic.FinishDataBlock(nil)
intersects, err := bif.Intersects(prop)
require.NoError(t, err)
Expand Down Expand Up @@ -339,9 +341,9 @@ func (b filterWithTrueForEmptyProp) Intersects(prop []byte) (bool, error) {
func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) {
// props with id=0, interval [10, 20); id=10, interval [110, 120).
var dbic testDataBlockIntervalCollector
bic0 := NewBlockIntervalCollector("p0", &dbic)
bic0 := NewBlockIntervalCollector("p0", &dbic, nil)
bic0Id := byte(0)
bic10 := NewBlockIntervalCollector("p10", &dbic)
bic10 := NewBlockIntervalCollector("p10", &dbic, nil)
bic10Id := byte(10)
dbic.i = interval{10, 20}
prop0 := append([]byte(nil), bic0Id)
Expand Down Expand Up @@ -486,9 +488,9 @@ func TestBlockPropertiesFilterer_Intersects(t *testing.T) {
// props with id=0, interval [10, 20); id=10, interval [110, 120).
var encoder blockPropertiesEncoder
var dbic testDataBlockIntervalCollector
bic0 := NewBlockIntervalCollector("", &dbic)
bic0 := NewBlockIntervalCollector("", &dbic, nil)
bic0Id := shortID(0)
bic10 := NewBlockIntervalCollector("", &dbic)
bic10 := NewBlockIntervalCollector("", &dbic, nil)
bic10Id := shortID(10)
dbic.i = interval{10, 20}
prop, err := bic0.FinishDataBlock(encoder.getScratchForProp())
Expand Down Expand Up @@ -794,6 +796,43 @@ func (c *valueCharBlockIntervalCollector) FinishDataBlock() (lower, upper uint64
return l, u, nil
}

// suffixIntervalCollector maintains an interval over the timestamps in
// MVCC-like suffixes for keys (e.g. foo@123).
type suffixIntervalCollector struct {
initialized bool
lower, upper uint64
}

// Add implements DataBlockIntervalCollector.
func (c *suffixIntervalCollector) Add(key InternalKey, _ []byte) error {
i := testkeys.Comparer.Split(key.UserKey)
ts, err := strconv.Atoi(string(key.UserKey[i+1:]))
if err != nil {
return err
}
uts := uint64(ts)
if !c.initialized {
c.lower, c.upper = uts, uts+1
c.initialized = true
return nil
}
if uts < c.lower {
c.lower = uts
}
if uts >= c.upper {
c.upper = uts + 1
}
return nil
}

// FinishDataBlock implements DataBlockIntervalCollector.
func (c *suffixIntervalCollector) FinishDataBlock() (lower, upper uint64, err error) {
l, u := c.lower, c.upper
c.lower, c.upper = 0, 0
c.initialized = false
return l, u, nil
}

func TestBlockProperties(t *testing.T) {
var r *Reader
defer func() {
Expand Down Expand Up @@ -823,20 +862,26 @@ func TestBlockProperties(t *testing.T) {
}
case "collector":
for _, c := range cmd.Vals {
var idx int
var points, ranges DataBlockIntervalCollector
switch c {
case "value-first":
idx = 0
points = &valueCharBlockIntervalCollector{charIdx: 0}
case "value-last":
idx = -1
points = &valueCharBlockIntervalCollector{charIdx: -1}
case "suffix":
points, ranges = &suffixIntervalCollector{}, &suffixIntervalCollector{}
case "suffix-point-keys-only":
points = &suffixIntervalCollector{}
case "suffix-range-keys-only":
ranges = &suffixIntervalCollector{}
default:
return fmt.Sprintf("unknown collector: %s", c)
}
opts.BlockPropertyCollectors = append(opts.BlockPropertyCollectors, func() BlockPropertyCollector {
return NewBlockIntervalCollector(c, &valueCharBlockIntervalCollector{
charIdx: idx,
opts.BlockPropertyCollectors = append(
opts.BlockPropertyCollectors,
func() BlockPropertyCollector {
return NewBlockIntervalCollector(c, points, ranges)
})
})
}
}
}
Expand Down
Loading

0 comments on commit 136b1e0

Please sign in to comment.