diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index b304c7803fe1..5a5716e79bc8 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -37,6 +37,13 @@ var ( "size below which a 'bulk' write will be performed as a normal write instead", 400*1<<10, // 400 Kib ) + + ingestDelay = settings.RegisterDurationSetting( + "bulkio.ingest.flush_delay", + "amount of time to wait before sending a file to the KV/Storage layer to ingest", + 0, + settings.NonNegativeDuration, + ) ) type sz int64 @@ -277,6 +284,17 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke } b.flushCounts.total++ + if delay := ingestDelay.Get(&b.settings.SV); delay != 0 { + if delay > time.Second || log.V(1) { + log.Infof(ctx, "delaying %s before flushing ingestion buffer...", delay) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()} start := roachpb.Key(append([]byte(nil), b.batchStartKey...))