Skip to content

Commit

Permalink
storage: remove PreIngestDelay
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
tbg committed Apr 26, 2023
1 parent 2667f6c commit d25a57f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 102 deletions.
89 changes: 21 additions & 68 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/pebbleiter"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -941,10 +939,6 @@ type Engine interface {
// additionally returns ingestion stats.
IngestExternalFilesWithStats(
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
PreIngestDelay(ctx context.Context)
// ApproximateDiskBytes returns an approximation of the on-disk size for the given key span.
ApproximateDiskBytes(from, to roachpb.Key) (uint64, error)
// CompactRange ensures that the specified range of key value pairs is
Expand Down Expand Up @@ -1493,68 +1487,27 @@ func ClearRangeWithHeuristic(
return nil
}

var ingestDelayL0Threshold = settings.RegisterIntSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayTime = settings.RegisterDurationSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST, since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact all foreground traffic by adding too many files to it.
// After the number of L0 files exceeds the configured limit, it gradually
// begins delaying more for each additional file in L0 over the limit until
// hitting its configured (via settings) maximum delay. If the pending
// compaction limit is exceeded, it waits for the maximum delay.
func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) {
if settings == nil {
return
}
metrics := eng.GetMetrics()
targetDelay := calculatePreIngestDelay(settings, metrics.Metrics)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels",
targetDelay, metrics.Levels[0].NumFiles, metrics.Levels[0].Sublevels)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(settings *cluster.Settings, metrics *pebble.Metrics) time.Duration {
maxDelay := ingestDelayTime.Get(&settings.SV)
l0ReadAmpLimit := ingestDelayL0Threshold.Get(&settings.SV)

const ramp = 10
l0ReadAmp := metrics.Levels[0].NumFiles
if metrics.Levels[0].Sublevels >= 0 {
l0ReadAmp = int64(metrics.Levels[0].Sublevels)
}

if l0ReadAmp > l0ReadAmpLimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(l0ReadAmp-l0ReadAmpLimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}
var _ = func() *settings.IntSetting {
s := settings.RegisterIntSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)
s.SetRetired()
return s
}()

var _ = func() *settings.DurationSetting {
s := settings.RegisterDurationSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)
s.SetRetired()
return s
}()

// Helper function to implement Reader.MVCCIterate().
func iterateOnReader(
Expand Down
29 changes: 0 additions & 29 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
Expand Down Expand Up @@ -1135,34 +1134,6 @@ func TestCreateCheckpoint_SpanConstrained(t *testing.T) {
}
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := cluster.MakeTestingClusterSettings()

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
fileCount int64
sublevelCount int32
}{
{0, 0, 0},
{0, 19, -1},
{0, 20, -1},
{ramp, 21, -1},
{ramp * 2, 22, -1},
{ramp * 2, 22, 22},
{ramp * 2, 55, 22},
{max, 55, -1},
} {
var m pebble.Metrics
m.Levels[0].NumFiles = tc.fileCount
m.Levels[0].Sublevels = tc.sublevelCount
require.Equal(t, tc.exp, calculatePreIngestDelay(s, &m))
}
}

func TestEngineFS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,11 +1899,6 @@ func (p *Pebble) IngestExternalFilesWithStats(
return p.db.IngestWithStats(paths)
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
}

// ApproximateDiskBytes implements the Engine interface.
func (p *Pebble) ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) {
fromEncoded := EngineKey{Key: from}.Encode()
Expand Down

0 comments on commit d25a57f

Please sign in to comment.