From 2cce39c20e12e65334dd6176b43fcefd298d1f11 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 11 Dec 2021 00:40:10 +0000 Subject: [PATCH] bulk: add pre-flush delay setting This is a very crude knob that can slow down all bulk data ingestion -- including IMPORTs, RESTOREs, Index Creation, etc -- by applying a fixed delay before each batch of data buffered by the process that is producing data is flushed out to the storage layer. This is almost never the 'right' way to throttle a process to reduce its impact on the cluster/nodes' ability to serve other traffic. It is almost always going to be preferable to have whatever node is being sent some request choose to delay or refuse that request, based on its own idea of what resources that request will use and if they may be scarce or at risk. For example, a node should delay ingesting an SST to allow compactions to catch up, either on itself or its raft followers, if not doing so might make future reads or writes slower (perhaps indirectly, if followers become too slow). However, while that may be true -- that a more granular limiter that is closer to the limited resource is generally superior and should be preferred -- it seems likely that having this blunt knob on the higher level sending process can provide an operator an option of last resort, if we discover, in production, that some operation is managing to impact a cluster despite the efforts of existing more granular limiting. In these cases, a knob like this could mitigate the impact via manual tuning for the duration of the operation, until a better long-term solution can be implemented. Release note (ops change): A new setting bulkio.ingest.flush_delay is added to act as a last-resort option to manually slow bulk-writing processes if needed for cluster stability. This should only be used if there is no better suited back-pressure mechanism available for the contended resource. --- pkg/kv/bulk/sst_batcher.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index b304c7803fe1..5a5716e79bc8 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -37,6 +37,13 @@ var ( "size below which a 'bulk' write will be performed as a normal write instead", 400*1<<10, // 400 Kib ) + + ingestDelay = settings.RegisterDurationSetting( + "bulkio.ingest.flush_delay", + "amount of time to wait before sending a file to the KV/Storage layer to ingest", + 0, + settings.NonNegativeDuration, + ) ) type sz int64 @@ -277,6 +284,17 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke } b.flushCounts.total++ + if delay := ingestDelay.Get(&b.settings.SV); delay != 0 { + if delay > time.Second || log.V(1) { + log.Infof(ctx, "delaying %s before flushing ingestion buffer...", delay) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()} start := roachpb.Key(append([]byte(nil), b.batchStartKey...))