From b34923feec497919b76e214c4cc2a7f9cef8c1a9 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Tue, 22 Oct 2019 21:24:02 -0400 Subject: [PATCH] storage/server: implement Pebble.PreIngestDelay Various bits of code movement so that `engine.RocksDB` and `engine.Pebble` can share the `PreIngestDelay` implementation. Added `engine.PebbleConfig` so that we have a struct-based interface for passing in configuration parameters to an `engine.Pebble`. Use this new interface for passing in both `roachpb.Attributes` and `cluster.Settings`, the latter of which is needed by `PreIngestDelay`. Fixes #41598 Release note: None --- pkg/server/config.go | 32 ++++++----- pkg/storage/engine/bench_pebble_test.go | 9 ++- pkg/storage/engine/engine.go | 73 +++++++++++++++++++++++++ pkg/storage/engine/engine_test.go | 41 ++++++++++++-- pkg/storage/engine/mvcc_test.go | 6 +- pkg/storage/engine/pebble.go | 55 +++++++++++-------- pkg/storage/engine/rocksdb.go | 68 +---------------------- pkg/storage/engine/rocksdb_test.go | 25 --------- 8 files changed, 172 insertions(+), 137 deletions(-) diff --git a/pkg/server/config.go b/pkg/server/config.go index 7ee059ecd877..0af04f1c1fe2 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -497,21 +497,25 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { if cfg.StorageEngine == base.EngineTypePebble { // TODO(itsbilal): Tune these options, and allow them to be overridden // in the spec (similar to the existing spec.RocksDBOptions and others). - pebbleOpts := &pebble.Options{ - Cache: pebbleCache, - MaxOpenFiles: int(openFileLimitPerStore), - MemTableSize: 64 << 20, - MemTableStopWritesThreshold: 4, - MinFlushRate: 4 << 20, - L0CompactionThreshold: 2, - L0StopWritesThreshold: 400, - LBaseMaxBytes: 64 << 20, // 64 MB - Levels: []pebble.LevelOptions{{ - BlockSize: 32 << 10, - }}, + pebbleConfig := engine.PebbleConfig{ + Dir: spec.Path, + Opts: &pebble.Options{ + Cache: pebbleCache, + MaxOpenFiles: int(openFileLimitPerStore), + MemTableSize: 64 << 20, + MemTableStopWritesThreshold: 4, + MinFlushRate: 4 << 20, + L0CompactionThreshold: 2, + L0StopWritesThreshold: 400, + LBaseMaxBytes: 64 << 20, // 64 MB + Levels: []pebble.LevelOptions{{ + BlockSize: 32 << 10, + }}, + }, + Attrs: spec.Attributes, + Settings: cfg.Settings, } - eng, err = engine.NewPebble(spec.Path, pebbleOpts) - eng.(*engine.Pebble).SetAttrs(spec.Attributes) + eng, err = engine.NewPebble(pebbleConfig) } else { rocksDBConfig := engine.RocksDBConfig{ Attrs: spec.Attributes, diff --git a/pkg/storage/engine/bench_pebble_test.go b/pkg/storage/engine/bench_pebble_test.go index 6fecc69dc2c4..5b97348103f9 100644 --- a/pkg/storage/engine/bench_pebble_test.go +++ b/pkg/storage/engine/bench_pebble_test.go @@ -36,7 +36,10 @@ func newPebbleOptions(fs vfs.FS) *pebble.Options { } func setupMVCCPebble(b testing.TB, dir string) Engine { - peb, err := NewPebble(dir, newPebbleOptions(vfs.Default)) + peb, err := NewPebble(PebbleConfig{ + Dir: dir, + Opts: newPebbleOptions(vfs.Default), + }) if err != nil { b.Fatalf("could not create new pebble instance at %s: %+v", dir, err) } @@ -44,7 +47,9 @@ func setupMVCCPebble(b testing.TB, dir string) Engine { } func setupMVCCInMemPebble(b testing.TB, loc string) Engine { - peb, err := NewPebble("", newPebbleOptions(vfs.NewMem())) + peb, err := NewPebble(PebbleConfig{ + Opts: newPebbleOptions(vfs.NewMem()), + }) if err != nil { b.Fatalf("could not create new in-mem pebble instance: %+v", err) } diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index b81ad2005b83..2057711415a3 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -12,10 +12,14 @@ package engine import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) @@ -579,3 +583,72 @@ func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) erro } return nil } + +var ingestDelayL0Threshold = settings.RegisterIntSetting( + "rocksdb.ingest_backpressure.l0_file_count_threshold", + "number of L0 files after which to backpressure SST ingestions", + 20, +) + +var ingestDelayPendingLimit = settings.RegisterByteSizeSetting( + "rocksdb.ingest_backpressure.pending_compaction_threshold", + "pending compaction estimate above which to backpressure SST ingestions", + 2<<30, /* 2 GiB */ +) + +var ingestDelayTime = settings.RegisterDurationSetting( + "rocksdb.ingest_backpressure.max_delay", + "maximum amount of time to backpressure a single SST ingestion", + time.Second*5, +) + +// PreIngestDelay may choose to block for some duration if L0 has an excessive +// number of files in it or if PendingCompactionBytesEstimate is elevated. This +// it is intended to be called before ingesting a new SST, since we'd rather +// backpressure the bulk operation adding SSTs than slow down the whole RocksDB +// instance and impact all forground traffic by adding too many files to it. +// After the number of L0 files exceeds the configured limit, it gradually +// begins delaying more for each additional file in L0 over the limit until +// hitting its configured (via settings) maximum delay. If the pending +// compaction limit is exceeded, it waits for the maximum delay. +func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) { + if settings == nil { + return + } + stats, err := eng.GetStats() + if err != nil { + log.Warningf(ctx, "failed to read stats: %+v", err) + return + } + targetDelay := calculatePreIngestDelay(settings, stats) + + if targetDelay == 0 { + return + } + log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate) + + select { + case <-time.After(targetDelay): + case <-ctx.Done(): + } +} + +func calculatePreIngestDelay(settings *cluster.Settings, stats *Stats) time.Duration { + maxDelay := ingestDelayTime.Get(&settings.SV) + l0Filelimit := ingestDelayL0Threshold.Get(&settings.SV) + compactionLimit := ingestDelayPendingLimit.Get(&settings.SV) + + if stats.PendingCompactionBytesEstimate >= compactionLimit { + return maxDelay + } + const ramp = 10 + if stats.L0FileCount > l0Filelimit { + delayPerFile := maxDelay / time.Duration(ramp) + targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile + if targetDelay > maxDelay { + return maxDelay + } + return targetDelay + } + return 0 +} diff --git a/pkg/storage/engine/engine_test.go b/pkg/storage/engine/engine_test.go index 1d91d45522fb..5257b034f341 100644 --- a/pkg/storage/engine/engine_test.go +++ b/pkg/storage/engine/engine_test.go @@ -21,6 +21,7 @@ import ( "sort" "strconv" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/pebble/vfs" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func ensureRangeEqual( @@ -62,12 +64,15 @@ var ( func runWithAllEngines(test func(e Engine, t *testing.T), t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - inMem := NewInMem(inMemAttrs, testCacheSize) - stopper.AddCloser(inMem) - test(inMem, t) - inMem.Close() - pebbleInMem, err := NewPebble("", &pebble.Options{ - FS: vfs.NewMem(), + rocksDBInMem := NewInMem(inMemAttrs, testCacheSize) + stopper.AddCloser(rocksDBInMem) + test(rocksDBInMem, t) + rocksDBInMem.Close() + + pebbleInMem, err := NewPebble(PebbleConfig{ + Opts: &pebble.Options{ + FS: vfs.NewMem(), + }, }) if err != nil { t.Fatal(err) @@ -1028,3 +1033,27 @@ func TestCreateCheckpoint(t *testing.T) { t.Fatal(err) } } + +func TestIngestDelayLimit(t *testing.T) { + defer leaktest.AfterTest(t)() + s := cluster.MakeTestingClusterSettings() + + max, ramp := time.Second*5, time.Second*5/10 + + for _, tc := range []struct { + exp time.Duration + stats Stats + }{ + {0, Stats{}}, + {0, Stats{L0FileCount: 19}}, + {0, Stats{L0FileCount: 20}}, + {ramp, Stats{L0FileCount: 21}}, + {ramp * 2, Stats{L0FileCount: 22}}, + {max, Stats{L0FileCount: 55}}, + {0, Stats{PendingCompactionBytesEstimate: (2 << 30) - 1}}, + {max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}}, + {max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}}, + } { + require.Equal(t, tc.exp, calculatePreIngestDelay(s, &tc.stats)) + } +} diff --git a/pkg/storage/engine/mvcc_test.go b/pkg/storage/engine/mvcc_test.go index f39ad71a6713..36270d8be5e1 100644 --- a/pkg/storage/engine/mvcc_test.go +++ b/pkg/storage/engine/mvcc_test.go @@ -88,8 +88,10 @@ func createTestRocksDBEngine() Engine { // createTestPebbleEngine returns a new in-memory Pebble storage engine. func createTestPebbleEngine() Engine { - peb, err := NewPebble("", &pebble.Options{ - FS: vfs.NewMem(), + peb, err := NewPebble(PebbleConfig{ + Opts: &pebble.Options{ + FS: vfs.NewMem(), + }, }) if err != nil { return nil diff --git a/pkg/storage/engine/pebble.go b/pkg/storage/engine/pebble.go index 60fceb185724..8ca21ea0ecab 100644 --- a/pkg/storage/engine/pebble.go +++ b/pkg/storage/engine/pebble.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -147,13 +148,28 @@ var PebbleTablePropertyCollectors = []func() pebble.TablePropertyCollector{ func() pebble.TablePropertyCollector { return &pebbleDeleteRangeCollector{} }, } +// PebbleConfig holds all configuration parameters and knobs used in setting up +// a new Pebble instance. +type PebbleConfig struct { + // Store attributes. These are only placed here for convenience, and not used + // by Pebble or the Pebble-based Engine implementation. + Attrs roachpb.Attributes + // Dir is the data directory for the Pebble instance. + Dir string + // Pebble specific options. + Opts *pebble.Options + // Settings instance for cluster-wide knobs. + Settings *cluster.Settings +} + // Pebble is a wrapper around a Pebble database instance. type Pebble struct { db *pebble.DB - closed bool - path string - attrs roachpb.Attributes + closed bool + path string + attrs roachpb.Attributes + settings *cluster.Settings // Relevant options copied over from pebble.Options. fs vfs.FS @@ -162,25 +178,27 @@ type Pebble struct { var _ WithSSTables = &Pebble{} // NewPebble creates a new Pebble instance, at the specified path. -func NewPebble(path string, cfg *pebble.Options) (*Pebble, error) { - cfg.Comparer = MVCCComparer - cfg.Merger = MVCCMerger - cfg.TablePropertyCollectors = PebbleTablePropertyCollectors +func NewPebble(cfg PebbleConfig) (*Pebble, error) { + cfg.Opts.Comparer = MVCCComparer + cfg.Opts.Merger = MVCCMerger + cfg.Opts.TablePropertyCollectors = PebbleTablePropertyCollectors // pebble.Open also calls EnsureDefaults, but only after doing a clone. Call // EnsureDefaults beforehand so we have a matching cfg here for when we save // cfg.FS and cfg.ReadOnly later on. - cfg.EnsureDefaults() + cfg.Opts.EnsureDefaults() - db, err := pebble.Open(path, cfg) + db, err := pebble.Open(cfg.Dir, cfg.Opts) if err != nil { return nil, err } return &Pebble{ - db: db, - path: path, - fs: cfg.FS, + db: db, + path: cfg.Dir, + attrs: cfg.Attrs, + settings: cfg.Settings, + fs: cfg.Opts.FS, }, nil } @@ -349,12 +367,6 @@ func (p *Pebble) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails // No-op. Logical logging disabled. } -// SetAttrs sets the attributes returned by Atts(). This method is not safe for -// concurrent use. -func (p *Pebble) SetAttrs(attrs roachpb.Attributes) { - p.attrs = attrs -} - // Attrs implements the Engine interface. func (p *Pebble) Attrs() roachpb.Attributes { return p.attrs @@ -393,7 +405,7 @@ func (p *Pebble) GetStats() (*Stats, error) { // GetEnvStats implements the Engine interface. func (p *Pebble) GetEnvStats() (*EnvStats, error) { - // TODO(itsbilal): Implement this. + // TODO(sumeer): Implement this. These are encryption-at-rest specific stats. return &EnvStats{}, nil } @@ -435,9 +447,8 @@ func (p *Pebble) IngestExternalFiles( } // PreIngestDelay implements the Engine interface. -func (p *Pebble) PreIngestDelay(_ context.Context) { - // TODO(itsbilal): See if we need to add pre-ingestion delays, similar to - // how we do with rocksdb. +func (p *Pebble) PreIngestDelay(ctx context.Context) { + preIngestDelay(ctx, p, p.settings) } // ApproximateDiskBytes implements the Engine interface. diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index cc25261cccdc..9229c273359b 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -73,24 +73,6 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt( return max }()) -var ingestDelayL0Threshold = settings.RegisterIntSetting( - "rocksdb.ingest_backpressure.l0_file_count_threshold", - "number of L0 files after which to backpressure SST ingestions", - 20, -) - -var ingestDelayPendingLimit = settings.RegisterByteSizeSetting( - "rocksdb.ingest_backpressure.pending_compaction_threshold", - "pending compaction estimate above which to backpressure SST ingestions", - 2<<30, /* 2 GiB */ -) - -var ingestDelayTime = settings.RegisterDurationSetting( - "rocksdb.ingest_backpressure.max_delay", - "maximum amount of time to backpressure a single SST ingestion", - time.Second*5, -) - // Set to true to perform expensive iterator debug leak checking. In normal // operation, we perform inexpensive iterator leak checking but those checks do // not indicate where the leak arose. The expensive checking tracks the stack @@ -3084,55 +3066,9 @@ func (r *RocksDB) setAuxiliaryDir(d string) error { return nil } -// PreIngestDelay may choose to block for some duration if L0 has an excessive -// number of files in it or if PendingCompactionBytesEstimate is elevated. This -// it is intended to be called before ingesting a new SST, since we'd rather -// backpressure the bulk operation adding SSTs than slow down the whole RocksDB -// instance and impact all forground traffic by adding too many files to it. -// After the number of L0 files exceeds the configured limit, it gradually -// begins delaying more for each additional file in L0 over the limit until -// hitting its configured (via settings) maximum delay. If the pending -// compaction limit is exceeded, it waits for the maximum delay. +// PreIngestDelay implements the Engine interface. func (r *RocksDB) PreIngestDelay(ctx context.Context) { - if r.cfg.Settings == nil { - return - } - stats, err := r.GetStats() - if err != nil { - log.Warningf(ctx, "failed to read stats: %+v", err) - return - } - targetDelay := calculatePreIngestDelay(r.cfg, stats) - - if targetDelay == 0 { - return - } - log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate) - - select { - case <-time.After(targetDelay): - case <-ctx.Done(): - } -} - -func calculatePreIngestDelay(cfg RocksDBConfig, stats *Stats) time.Duration { - maxDelay := ingestDelayTime.Get(&cfg.Settings.SV) - l0Filelimit := ingestDelayL0Threshold.Get(&cfg.Settings.SV) - compactionLimit := ingestDelayPendingLimit.Get(&cfg.Settings.SV) - - if stats.PendingCompactionBytesEstimate >= compactionLimit { - return maxDelay - } - const ramp = 10 - if stats.L0FileCount > l0Filelimit { - delayPerFile := maxDelay / time.Duration(ramp) - targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile - if targetDelay > maxDelay { - return maxDelay - } - return targetDelay - } - return 0 + preIngestDelay(ctx, r, r.cfg.Settings) } // IngestExternalFiles atomically links a slice of files into the RocksDB diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index a1e2edd6bf7c..955443021ad3 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/stretchr/testify/require" ) const testCacheSize = 1 << 30 // 1 GB @@ -1626,27 +1625,3 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) { } }) } - -func TestIngestDelayLimit(t *testing.T) { - defer leaktest.AfterTest(t)() - cfg := RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()} - - max, ramp := time.Second*5, time.Second*5/10 - - for _, tc := range []struct { - exp time.Duration - stats Stats - }{ - {0, Stats{}}, - {0, Stats{L0FileCount: 19}}, - {0, Stats{L0FileCount: 20}}, - {ramp, Stats{L0FileCount: 21}}, - {ramp * 2, Stats{L0FileCount: 22}}, - {max, Stats{L0FileCount: 55}}, - {0, Stats{PendingCompactionBytesEstimate: (2 << 30) - 1}}, - {max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}}, - {max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}}, - } { - require.Equal(t, tc.exp, calculatePreIngestDelay(cfg, &tc.stats)) - } -}