Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: remove above-raft throttling of AddSST #101530

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ import (

// Limiters is the collection of per-store limits used during cmd evaluation.
type Limiters struct {
BulkIOWriteRate *rate.Limiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableAsWritesRequests limit.ConcurrentRequestLimiter
BulkIOWriteRate *rate.Limiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
Expand Down
54 changes: 26 additions & 28 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,38 @@ var bulkIOWriteLimit = settings.RegisterByteSizeSetting(
).WithPublic()

// addSSTableRequestLimit limits concurrent AddSSTable requests.
var addSSTableRequestLimit = settings.RegisterIntSetting(
settings.SystemOnly,
"kv.bulk_io_write.concurrent_addsstable_requests",
"number of concurrent AddSSTable requests per store before queueing",
1,
settings.PositiveInt,
)
// This is deprecated: we no longer slow down AddSST in (*Store).Send.
// Instead, it's admission control's job to pace ingestions.
var _ = func() *settings.IntSetting {
s := settings.RegisterIntSetting(
settings.SystemOnly,
"kv.bulk_io_write.concurrent_addsstable_requests",
"number of concurrent AddSSTable requests per store before queueing",
1,
settings.PositiveInt,
)
s.SetRetired()
return s
}()

// addSSTableAsWritesRequestLimit limits concurrent AddSSTable requests with
// IngestAsWrites set. These are smaller (kv.bulk_io_write.small_write_size),
// and will end up in the Pebble memtable (default 64 MB) before flushing to
// disk, so we can allow a greater amount of concurrency than regular AddSSTable
// requests. Applied independently of concurrent_addsstable_requests.
var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting(
settings.SystemOnly,
"kv.bulk_io_write.concurrent_addsstable_as_writes_requests",
"number of concurrent AddSSTable requests ingested as writes per store before queueing",
10,
settings.PositiveInt,
)
//
// This is deprecated and no longer has an effect, see above.
var _ = func() *settings.IntSetting {
s := settings.RegisterIntSetting(
settings.SystemOnly,
"kv.bulk_io_write.concurrent_addsstable_as_writes_requests",
"number of concurrent AddSSTable requests ingested as writes per store before queueing",
10,
settings.PositiveInt,
)
s.SetRetired()
return s
}()

// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
Expand Down Expand Up @@ -1391,20 +1403,6 @@ func NewStore(
}
s.limiters.ConcurrentExportRequests.SetLimit(limit)
})
s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter(
"addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableRequestLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentAddSSTableRequests.SetLimit(
int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentAddSSTableAsWritesRequests = limit.MakeConcurrentRequestLimiter(
"addSSTableAsWritesRequestLimiter", int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableAsWritesRequestLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentAddSSTableAsWritesRequests.SetLimit(
int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
)
Expand Down
27 changes: 1 addition & 26 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,32 +337,7 @@ func (s *Store) maybeThrottleBatch(
return nil, nil
}

switch t := ba.Requests[0].GetInner().(type) {
case *kvpb.AddSSTableRequest:
limiter := s.limiters.ConcurrentAddSSTableRequests
if t.IngestAsWrites {
limiter = s.limiters.ConcurrentAddSSTableAsWritesRequests
}
before := timeutil.Now()
res, err := limiter.Begin(ctx)
if err != nil {
return nil, err
}

beforeEngineDelay := timeutil.Now()
// TODO(sep-raft-log): can we get rid of this?
s.TODOEngine().PreIngestDelay(ctx)
after := timeutil.Now()

waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
return res, nil

switch ba.Requests[0].GetInner().(type) {
case *kvpb.ExportRequest:
// Limit the number of concurrent Export requests, as these often scan and
// entire Range at a time and place significant read load on a Store.
Expand Down
89 changes: 21 additions & 68 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/pebbleiter"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -941,10 +939,6 @@ type Engine interface {
// additionally returns ingestion stats.
IngestExternalFilesWithStats(
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
PreIngestDelay(ctx context.Context)
// ApproximateDiskBytes returns an approximation of the on-disk size for the given key span.
ApproximateDiskBytes(from, to roachpb.Key) (uint64, error)
// CompactRange ensures that the specified range of key value pairs is
Expand Down Expand Up @@ -1493,68 +1487,27 @@ func ClearRangeWithHeuristic(
return nil
}

var ingestDelayL0Threshold = settings.RegisterIntSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayTime = settings.RegisterDurationSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST, since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact all foreground traffic by adding too many files to it.
// After the number of L0 files exceeds the configured limit, it gradually
// begins delaying more for each additional file in L0 over the limit until
// hitting its configured (via settings) maximum delay. If the pending
// compaction limit is exceeded, it waits for the maximum delay.
func preIngestDelay(ctx context.Context, eng Engine, settings *cluster.Settings) {
if settings == nil {
return
}
metrics := eng.GetMetrics()
targetDelay := calculatePreIngestDelay(settings, metrics.Metrics)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %d L0 Sublevels",
targetDelay, metrics.Levels[0].NumFiles, metrics.Levels[0].Sublevels)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(settings *cluster.Settings, metrics *pebble.Metrics) time.Duration {
maxDelay := ingestDelayTime.Get(&settings.SV)
l0ReadAmpLimit := ingestDelayL0Threshold.Get(&settings.SV)

const ramp = 10
l0ReadAmp := metrics.Levels[0].NumFiles
if metrics.Levels[0].Sublevels >= 0 {
l0ReadAmp = int64(metrics.Levels[0].Sublevels)
}

if l0ReadAmp > l0ReadAmpLimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(l0ReadAmp-l0ReadAmpLimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}
var _ = func() *settings.IntSetting {
s := settings.RegisterIntSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)
s.SetRetired()
return s
}()

var _ = func() *settings.DurationSetting {
s := settings.RegisterDurationSetting(
settings.TenantWritable,
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)
s.SetRetired()
return s
}()

// Helper function to implement Reader.MVCCIterate().
func iterateOnReader(
Expand Down
29 changes: 0 additions & 29 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
Expand Down Expand Up @@ -1135,34 +1134,6 @@ func TestCreateCheckpoint_SpanConstrained(t *testing.T) {
}
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := cluster.MakeTestingClusterSettings()

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
fileCount int64
sublevelCount int32
}{
{0, 0, 0},
{0, 19, -1},
{0, 20, -1},
{ramp, 21, -1},
{ramp * 2, 22, -1},
{ramp * 2, 22, 22},
{ramp * 2, 55, 22},
{max, 55, -1},
} {
var m pebble.Metrics
m.Levels[0].NumFiles = tc.fileCount
m.Levels[0].Sublevels = tc.sublevelCount
require.Equal(t, tc.exp, calculatePreIngestDelay(s, &m))
}
}

func TestEngineFS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,11 +1899,6 @@ func (p *Pebble) IngestExternalFilesWithStats(
return p.db.IngestWithStats(paths)
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
}

// ApproximateDiskBytes implements the Engine interface.
func (p *Pebble) ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) {
fromEncoded := EngineKey{Key: from}.Encode()
Expand Down