Skip to content

Commit

Permalink
db: don't load large filter blocks for flushable ingests
Browse files Browse the repository at this point in the history
We have seen cases where a very large file is ingested as flushable,
and reads block on loading a very large bloom filter.

This PR sets a size limit of 64KB for bloom filter blocks for ingested
flushables. We achieve this by extending `manifest.Level` to be able
to represent flushable ingests as a "pseudo-level" and changing the
`useFilterBlock` flag for new iterator creation to a
`filterBlockSizeLimit` value.

Fixes #3787
  • Loading branch information
RaduBerinde committed Jul 27, 2024
1 parent ca2b210 commit ed42fb4
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 103 deletions.
17 changes: 17 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,23 @@ func runBatchDefineCmd(d *datadriven.TestData, b *Batch) error {
return errors.Errorf("%s expects 2 arguments", parts[0])
}
err = b.Set([]byte(parts[1]), parseValue(parts[2]), nil)

case "set-multiple":
if len(parts) != 3 {
return errors.Errorf("%s expects 2 arguments (n and prefix)", parts[0])
}
n, err := strconv.ParseUint(parts[1], 10, 32)
if err != nil {
return err
}
for i := uint64(0); i < n; i++ {
key := fmt.Sprintf("%s-%05d", parts[2], i)
val := fmt.Sprintf("val-%05d", i)
if err := b.Set([]byte(key), []byte(val), nil); err != nil {
return err
}
}

case "del":
if len(parts) != 2 {
return errors.Errorf("%s expects 1 argument", parts[0])
Expand Down
2 changes: 1 addition & 1 deletion external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
seqNum--
pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
false, /* useFilterBlock */
sstable.NeverUseFilterBlock,
&it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
sstable.TrivialReaderProvider{Reader: r})
if err != nil {
Expand Down
10 changes: 3 additions & 7 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,8 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
if o != nil {
opts = *o
}
// TODO(bananabrick): The manifest.Level in newLevelIter is only used for
// logging. Update the manifest.Level encoding to account for levels which
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestLevel(),
internalIterOpts{},
)
}
Expand Down Expand Up @@ -252,7 +248,7 @@ func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIter
return keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare,
s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestLevel(),
manifest.KeyTypePoint,
)
}
Expand All @@ -266,7 +262,7 @@ func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIter
return keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
s.slice.Iter(), manifest.FlushableIngestLevel(), manifest.KeyTypeRange,
)
}

Expand Down
36 changes: 33 additions & 3 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
Expand Down Expand Up @@ -427,12 +428,17 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
closed = false
blockFlush = false
)
cache := NewCache(0)
defer func() {
if !closed {
require.NoError(t, d.Close())
}
cache.Unref()
}()

var fsLog struct {
sync.Mutex
buf bytes.Buffer
}
reset := func(strictMem bool) {
if d != nil && !closed {
require.NoError(t, d.Close())
Expand All @@ -447,14 +453,27 @@ func TestOverlappingIngestedSSTs(t *testing.T) {

require.NoError(t, mem.MkdirAll("ext", 0755))
opts = (&Options{
FS: mem,
FS: vfs.WithLogging(mem, func(format string, args ...interface{}) {
fsLog.Lock()
defer fsLog.Unlock()
fmt.Fprintf(&fsLog.buf, format+"\n", args...)
}),
Cache: cache,
MemTableStopWritesThreshold: 4,
L0CompactionThreshold: 100,
L0StopWritesThreshold: 100,
DebugCheck: DebugCheckLevels,
FormatMajorVersion: internalFormatNewest,
Logger: testLogger{t},
}).WithFSDefaults()
if testing.Verbose() {
lel := MakeLoggingEventListener(DefaultLogger)
opts.EventListener = &lel
}
opts.EnsureDefaults()
// Some of the tests require bloom filters.
opts.Levels[0].FilterPolicy = bloom.FilterPolicy(10)

// Disable automatic compactions because otherwise we'll race with
// delete-only compactions triggered by ingesting range tombstones.
opts.DisableAutomaticCompactions = true
Expand All @@ -476,7 +495,18 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
}
reset(false)

datadriven.RunTest(t, "testdata/flushable_ingest", func(t *testing.T, td *datadriven.TestData) string {
datadriven.RunTest(t, "testdata/flushable_ingest", func(t *testing.T, td *datadriven.TestData) (result string) {
if td.HasArg("with-fs-logging") {
fsLog.Lock()
fsLog.buf.Reset()
fsLog.Unlock()
defer func() {
fsLog.Lock()
defer fsLog.Unlock()
result = fsLog.buf.String() + result
}()
}

switch td.Cmd {
case "reset":
reset(td.HasArg("strictMem"))
Expand Down
37 changes: 30 additions & 7 deletions internal/manifest/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,33 @@ package manifest
import "fmt"

const (
// 3 bits are necessary to represent level values from 0-6.
// 3 bits are necessary to represent level values from 0-6 or 7 for flushable
// ingests.
levelBits = 3
levelMask = (1 << levelBits) - 1
// invalidSublevel denotes an invalid or non-applicable sublevel.
invalidSublevel = -1
invalidSublevel = -1
flushableIngestLevelValue = 7
)

// Level encodes a level and optional sublevel for use in log and error
// messages. The encoding has the property that Level(0) ==
// L0Sublevel(invalidSublevel).
// Level encodes a level and optional sublevel. It can also represent the
// conceptual layer of flushable ingests as "level" -1.
//
// The encoding has the property that Level(0) == L0Sublevel(invalidSublevel).
type Level uint32

func makeLevel(level, sublevel int) Level {
return Level(((sublevel + 1) << levelBits) | level)
}

// LevelToInt returns the int representation of a Level
// LevelToInt returns the int representation of a Level. Returns -1 if the Level
// refers to the flushable ingests pseudo-level.
func LevelToInt(l Level) int {
return int(l) & levelMask
l &= levelMask
if l == flushableIngestLevelValue {
return -1
}
return int(l)
}

// L0Sublevel returns a Level representing the specified L0 sublevel.
Expand All @@ -36,11 +44,26 @@ func L0Sublevel(sublevel int) Level {
return makeLevel(0, sublevel)
}

// FlushableIngestLevel returns a Level that represents the flushable ingests
// pseudo-level.
func FlushableIngestLevel() Level {
return makeLevel(flushableIngestLevelValue, invalidSublevel)
}

// FlushableIngestLevel returns true if l represents the flushable ingests
// pseudo-level.
func (l Level) FlushableIngestLevel() bool {
return LevelToInt(l) == -1
}

func (l Level) String() string {
level := int(l) & levelMask
sublevel := (int(l) >> levelBits) - 1
if sublevel != invalidSublevel {
return fmt.Sprintf("L%d.%d", level, sublevel)
}
if level == flushableIngestLevelValue {
return "flushable-ingest"
}
return fmt.Sprintf("L%d", level)
}
53 changes: 14 additions & 39 deletions internal/manifest/level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,34 @@
package manifest

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestLevel(t *testing.T) {
testCases := []struct {
level int
level Level
expected string
}{
{0, "L0"},
{1, "L1"},
{2, "L2"},
{3, "L3"},
{4, "L4"},
{5, "L5"},
{6, "L6"},
{7, "L7"},
}
{Level(0), "L0"},
{Level(1), "L1"},
{Level(2), "L2"},
{Level(3), "L3"},
{Level(4), "L4"},
{Level(5), "L5"},
{Level(6), "L6"},

for _, c := range testCases {
t.Run("", func(t *testing.T) {
s := Level(c.level).String()
require.EqualValues(t, c.expected, s)
})
}
}
{L0Sublevel(0), "L0.0"},
{L0Sublevel(1), "L0.1"},
{L0Sublevel(2), "L0.2"},

func TestL0Sublevel(t *testing.T) {
testCases := []struct {
level int
sublevel int
expected string
}{
{0, 0, "L0.0"},
{0, 1, "L0.1"},
{0, 2, "L0.2"},
{0, 1000, "L0.1000"},
{0, -1, "invalid L0 sublevel: -1"},
{0, -2, "invalid L0 sublevel: -2"},
{FlushableIngestLevel(), "flushable-ingest"},
}

for _, c := range testCases {
t.Run("", func(t *testing.T) {
s := func() (result string) {
defer func() {
if r := recover(); r != nil {
result = fmt.Sprint(r)
}
}()
return L0Sublevel(c.sublevel).String()
}()
t.Run(c.expected, func(t *testing.T) {
s := c.level.String()
require.EqualValues(t, c.expected, s)
})
}
Expand Down
5 changes: 4 additions & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,10 @@ func (i *Iterator) sampleRead() {
return
}
if len(mi.levels) > 1 {
mi.ForEachLevelIter(func(li *levelIter) bool {
mi.ForEachLevelIter(func(li *levelIter) (done bool) {
if li.level.FlushableIngestLevel() {
return false
}
l := manifest.LevelToInt(li.level)
if f := li.iterFile; f != nil {
var containsKey bool
Expand Down
2 changes: 1 addition & 1 deletion level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (lt *levelIterTest) newIters(
if kinds.Point() {
iter, err := lt.readers[file.FileNum].NewIterWithBlockPropertyFiltersAndContextEtc(
ctx, transforms,
opts.LowerBound, opts.UpperBound, nil, true /* useFilterBlock */, iio.stats, sstable.CategoryAndQoS{},
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats, sstable.CategoryAndQoS{},
nil, sstable.TrivialReaderProvider{Reader: lt.readers[file.FileNum]})
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
Expand Down
2 changes: 1 addition & 1 deletion merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestMergingIterCornerCases(t *testing.T) {
if kinds.Point() {
set.point, err = r.NewIterWithBlockPropertyFilters(
sstable.NoTransforms,
opts.GetLowerBound(), opts.GetUpperBound(), nil, true /* useFilterBlock */, iio.stats,
opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, iio.stats,
sstable.CategoryAndQoS{}, nil, sstable.TrivialReaderProvider{Reader: r})
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
Expand Down
4 changes: 2 additions & 2 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func TestBlockProperties(t *testing.T) {
return "filter excludes entire table"
}
iter, err := r.NewIterWithBlockPropertyFilters(
NoTransforms, lower, upper, filterer, false /* useFilterBlock */, &stats,
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
if err != nil {
return err.Error()
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) {
return "filter excludes entire table"
}
iter, err := r.NewIterWithBlockPropertyFilters(
NoTransforms, lower, upper, filterer, false /* useFilterBlock */, &stats,
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
if err != nil {
return err.Error()
Expand Down
6 changes: 5 additions & 1 deletion sstable/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ func runErrorInjectionTest(t *testing.T, seed int64) {
// point keys only. Should we add variants of this test that run random
// operations on the range deletion and range key iterators?
var stats base.InternalIteratorStats
filterBlockSizeLimit := AlwaysUseFilterBlock
if rng.Intn(2) == 1 {
filterBlockSizeLimit = NeverUseFilterBlock
}
it, err := r.NewIterWithBlockPropertyFilters(
NoTransforms,
nil /* lower TODO */, nil, /* upper TODO */
filterer,
rng.Intn(2) == 1, /* use filter block */
filterBlockSizeLimit,
&stats,
CategoryAndQoS{},
nil, /* CategoryStatsCollector */
Expand Down
Loading

0 comments on commit ed42fb4

Please sign in to comment.