Skip to content

Commit

Permalink
storage/server: implement Pebble.PreIngestDelay
Browse files Browse the repository at this point in the history
Various bits of code movement so that `engine.RocksDB` and
`engine.Pebble` can share the `PreIngestDelay` implementation. Added
`engine.PebbleConfig` so that we have a struct-based interface for
passing in configuration parameters to an `engine.Pebble`. Use this new
interface for passing in both `roachpb.Attributes` and
`cluster.Settings`, the latter of which is needed by `PreIngestDelay`.

Fixes #41598

Release note: None
  • Loading branch information
petermattis committed Oct 24, 2019
1 parent f581b8d commit b34923f
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 137 deletions.
32 changes: 18 additions & 14 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,21 +497,25 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
if cfg.StorageEngine == base.EngineTypePebble {
// TODO(itsbilal): Tune these options, and allow them to be overridden
// in the spec (similar to the existing spec.RocksDBOptions and others).
pebbleOpts := &pebble.Options{
Cache: pebbleCache,
MaxOpenFiles: int(openFileLimitPerStore),
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 400,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: []pebble.LevelOptions{{
BlockSize: 32 << 10,
}},
pebbleConfig := engine.PebbleConfig{
Dir: spec.Path,
Opts: &pebble.Options{
Cache: pebbleCache,
MaxOpenFiles: int(openFileLimitPerStore),
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 400,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: []pebble.LevelOptions{{
BlockSize: 32 << 10,
}},
},
Attrs: spec.Attributes,
Settings: cfg.Settings,
}
eng, err = engine.NewPebble(spec.Path, pebbleOpts)
eng.(*engine.Pebble).SetAttrs(spec.Attributes)
eng, err = engine.NewPebble(pebbleConfig)
} else {
rocksDBConfig := engine.RocksDBConfig{
Attrs: spec.Attributes,
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/engine/bench_pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ func newPebbleOptions(fs vfs.FS) *pebble.Options {
}

func setupMVCCPebble(b testing.TB, dir string) Engine {
peb, err := NewPebble(dir, newPebbleOptions(vfs.Default))
peb, err := NewPebble(PebbleConfig{
Dir: dir,
Opts: newPebbleOptions(vfs.Default),
})
if err != nil {
b.Fatalf("could not create new pebble instance at %s: %+v", dir, err)
}
return peb
}

func setupMVCCInMemPebble(b testing.TB, loc string) Engine {
peb, err := NewPebble("", newPebbleOptions(vfs.NewMem()))
peb, err := NewPebble(PebbleConfig{
Opts: newPebbleOptions(vfs.NewMem()),
})
if err != nil {
b.Fatalf("could not create new in-mem pebble instance: %+v", err)
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ package engine

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

Expand Down Expand Up @@ -579,3 +583,72 @@ func ClearRangeWithHeuristic(eng Reader, writer Writer, start, end MVCCKey) erro
}
return nil
}

var ingestDelayL0Threshold = settings.RegisterIntSetting(
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
"rocksdb.ingest_backpressure.pending_compaction_threshold",
"pending compaction estimate above which to backpressure SST ingestions",
2<<30, /* 2 GiB */
)

var ingestDelayTime = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST, since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact all forground traffic by adding too many files to it.
// After the number of L0 files exceeds the configured limit, it gradually
// begins delaying more for each additional file in L0 over the limit until
// hitting its configured (via settings) maximum delay. If the pending
// compaction limit is exceeded, it waits for the maximum delay.
func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) {
if settings == nil {
return
}
stats, err := eng.GetStats()
if err != nil {
log.Warningf(ctx, "failed to read stats: %+v", err)
return
}
targetDelay := calculatePreIngestDelay(settings, stats)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(settings *cluster.Settings, stats *Stats) time.Duration {
maxDelay := ingestDelayTime.Get(&settings.SV)
l0Filelimit := ingestDelayL0Threshold.Get(&settings.SV)
compactionLimit := ingestDelayPendingLimit.Get(&settings.SV)

if stats.PendingCompactionBytesEstimate >= compactionLimit {
return maxDelay
}
const ramp = 10
if stats.L0FileCount > l0Filelimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}
41 changes: 35 additions & 6 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strconv"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/cockroachdb/pebble/vfs"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func ensureRangeEqual(
Expand Down Expand Up @@ -62,12 +64,15 @@ var (
func runWithAllEngines(test func(e Engine, t *testing.T), t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
inMem := NewInMem(inMemAttrs, testCacheSize)
stopper.AddCloser(inMem)
test(inMem, t)
inMem.Close()
pebbleInMem, err := NewPebble("", &pebble.Options{
FS: vfs.NewMem(),
rocksDBInMem := NewInMem(inMemAttrs, testCacheSize)
stopper.AddCloser(rocksDBInMem)
test(rocksDBInMem, t)
rocksDBInMem.Close()

pebbleInMem, err := NewPebble(PebbleConfig{
Opts: &pebble.Options{
FS: vfs.NewMem(),
},
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1028,3 +1033,27 @@ func TestCreateCheckpoint(t *testing.T) {
t.Fatal(err)
}
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
s := cluster.MakeTestingClusterSettings()

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
stats Stats
}{
{0, Stats{}},
{0, Stats{L0FileCount: 19}},
{0, Stats{L0FileCount: 20}},
{ramp, Stats{L0FileCount: 21}},
{ramp * 2, Stats{L0FileCount: 22}},
{max, Stats{L0FileCount: 55}},
{0, Stats{PendingCompactionBytesEstimate: (2 << 30) - 1}},
{max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}},
{max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}},
} {
require.Equal(t, tc.exp, calculatePreIngestDelay(s, &tc.stats))
}
}
6 changes: 4 additions & 2 deletions pkg/storage/engine/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ func createTestRocksDBEngine() Engine {

// createTestPebbleEngine returns a new in-memory Pebble storage engine.
func createTestPebbleEngine() Engine {
peb, err := NewPebble("", &pebble.Options{
FS: vfs.NewMem(),
peb, err := NewPebble(PebbleConfig{
Opts: &pebble.Options{
FS: vfs.NewMem(),
},
})
if err != nil {
return nil
Expand Down
55 changes: 33 additions & 22 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -147,13 +148,28 @@ var PebbleTablePropertyCollectors = []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector { return &pebbleDeleteRangeCollector{} },
}

// PebbleConfig holds all configuration parameters and knobs used in setting up
// a new Pebble instance.
type PebbleConfig struct {
// Store attributes. These are only placed here for convenience, and not used
// by Pebble or the Pebble-based Engine implementation.
Attrs roachpb.Attributes
// Dir is the data directory for the Pebble instance.
Dir string
// Pebble specific options.
Opts *pebble.Options
// Settings instance for cluster-wide knobs.
Settings *cluster.Settings
}

// Pebble is a wrapper around a Pebble database instance.
type Pebble struct {
db *pebble.DB

closed bool
path string
attrs roachpb.Attributes
closed bool
path string
attrs roachpb.Attributes
settings *cluster.Settings

// Relevant options copied over from pebble.Options.
fs vfs.FS
Expand All @@ -162,25 +178,27 @@ type Pebble struct {
var _ WithSSTables = &Pebble{}

// NewPebble creates a new Pebble instance, at the specified path.
func NewPebble(path string, cfg *pebble.Options) (*Pebble, error) {
cfg.Comparer = MVCCComparer
cfg.Merger = MVCCMerger
cfg.TablePropertyCollectors = PebbleTablePropertyCollectors
func NewPebble(cfg PebbleConfig) (*Pebble, error) {
cfg.Opts.Comparer = MVCCComparer
cfg.Opts.Merger = MVCCMerger
cfg.Opts.TablePropertyCollectors = PebbleTablePropertyCollectors

// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults beforehand so we have a matching cfg here for when we save
// cfg.FS and cfg.ReadOnly later on.
cfg.EnsureDefaults()
cfg.Opts.EnsureDefaults()

db, err := pebble.Open(path, cfg)
db, err := pebble.Open(cfg.Dir, cfg.Opts)
if err != nil {
return nil, err
}

return &Pebble{
db: db,
path: path,
fs: cfg.FS,
db: db,
path: cfg.Dir,
attrs: cfg.Attrs,
settings: cfg.Settings,
fs: cfg.Opts.FS,
}, nil
}

Expand Down Expand Up @@ -349,12 +367,6 @@ func (p *Pebble) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails
// No-op. Logical logging disabled.
}

// SetAttrs sets the attributes returned by Atts(). This method is not safe for
// concurrent use.
func (p *Pebble) SetAttrs(attrs roachpb.Attributes) {
p.attrs = attrs
}

// Attrs implements the Engine interface.
func (p *Pebble) Attrs() roachpb.Attributes {
return p.attrs
Expand Down Expand Up @@ -393,7 +405,7 @@ func (p *Pebble) GetStats() (*Stats, error) {

// GetEnvStats implements the Engine interface.
func (p *Pebble) GetEnvStats() (*EnvStats, error) {
// TODO(itsbilal): Implement this.
// TODO(sumeer): Implement this. These are encryption-at-rest specific stats.
return &EnvStats{}, nil
}

Expand Down Expand Up @@ -435,9 +447,8 @@ func (p *Pebble) IngestExternalFiles(
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(_ context.Context) {
// TODO(itsbilal): See if we need to add pre-ingestion delays, similar to
// how we do with rocksdb.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
}

// ApproximateDiskBytes implements the Engine interface.
Expand Down
Loading

0 comments on commit b34923f

Please sign in to comment.