From 50d101612ff93c2a5d33b42e257ed2a5c81a52f3 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:46:36 +0000 Subject: [PATCH 1/3] kv/bulk: remove error return from SSTBatcher.Reset Release note: none. Epic: none. --- pkg/ccl/backupccl/restore_data_processor.go | 6 ++--- .../stream_ingestion_processor.go | 4 +--- pkg/kv/bulk/buffering_adder.go | 4 +--- pkg/kv/bulk/sst_batcher.go | 22 +++++++++---------- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index b458ed7c4b2f..fa56c1e27e53 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -767,7 +767,7 @@ func reserveRestoreWorkerMemory( // implement a mock SSTBatcher used purely for job progress tracking. type SSTBatcherExecutor interface { AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error - Reset(ctx context.Context) error + Reset(ctx context.Context) Flush(ctx context.Context) error Close(ctx context.Context) GetSummary() kvpb.BulkOpSummary @@ -786,9 +786,7 @@ func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, va } // Reset resets the counter -func (b *sstBatcherNoop) Reset(ctx context.Context) error { - return nil -} +func (b *sstBatcherNoop) Reset(ctx context.Context) {} // Flush noops. func (b *sstBatcherNoop) Flush(ctx context.Context) error { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 094a7c326be0..d78f80a05f79 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -1188,9 +1188,7 @@ func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.Res sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curKVBatch))) sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curRangeKVBatch))) - if err := sip.batcher.Reset(ctx); err != nil { - return b.checkpoint, err - } + sip.batcher.Reset(ctx) releaseBuffer(b.buffer) diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 44a419d0f7d1..c51fd3bf80b3 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -256,9 +256,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { b.curBufSummary.Reset() return nil } - if err := b.sink.Reset(ctx); err != nil { - return err - } + b.sink.Reset(ctx) b.sink.currentStats.BufferFlushes++ var before *bulkpb.IngestionPerformanceStats diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 97ca41e42d02..7d11cf35ece4 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -232,8 +232,8 @@ func MakeSSTBatcher( } b.mu.lastFlush = timeutil.Now() b.mu.tracingSpan = tracing.SpanFromContext(ctx) - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } // MakeStreamSSTBatcher creates a batcher configured to ingest duplicate keys @@ -258,8 +258,8 @@ func MakeStreamSSTBatcher( b.mu.lastFlush = timeutil.Now() b.mu.tracingSpan = tracing.SpanFromContext(ctx) b.SetOnFlush(onFlush) - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } // MakeTestingSSTBatcher creates a batcher for testing, allowing setting options @@ -281,8 +281,8 @@ func MakeTestingSSTBatcher( mem: mem, limiter: sendLimiter, } - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { @@ -373,7 +373,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value } // Reset clears all state in the batcher and prepares it for reuse. -func (b *SSTBatcher) Reset(ctx context.Context) error { +func (b *SSTBatcher) Reset(ctx context.Context) { b.sstWriter.Close() b.sstFile = &storage.MemObject{} // Create sstables intended for ingestion using the newest format that all @@ -402,8 +402,6 @@ func (b *SSTBatcher) Reset(ctx context.Context) error { if b.mu.totalStats.SendWaitByStore == nil { b.mu.totalStats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration) } - - return nil } const ( @@ -438,7 +436,8 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if err := b.doFlush(ctx, rangeFlush); err != nil { return err } - return b.Reset(ctx) + b.Reset(ctx) + return nil } if b.sstWriter.DataSize >= ingestFileSize(b.settings) { @@ -462,7 +461,8 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if err := b.doFlush(ctx, sizeFlush); err != nil { return err } - return b.Reset(ctx) + b.Reset(ctx) + return nil } return nil } From 36b74093a720cd170569f31e0dc71147e5523703 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:51:12 +0000 Subject: [PATCH 2/3] streamingest: ensure batcher is always reset Release note: none. Epic: none. --- .../streamingccl/streamingest/stream_ingestion_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index d78f80a05f79..2e50f6d00d9c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -1155,6 +1155,8 @@ type flushableBuffer struct { func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.ResolvedSpans, error) { ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() + // Ensure the batcher is always reset, even on early error returns. + defer sip.batcher.Reset(ctx) // First process the point KVs. // @@ -1188,8 +1190,6 @@ func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.Res sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curKVBatch))) sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curRangeKVBatch))) - sip.batcher.Reset(ctx) - releaseBuffer(b.buffer) return b.checkpoint, nil From eb3059f598ad080ecb100f1951d8189d4b627c7c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:52:57 +0000 Subject: [PATCH 3/3] kv/bulk: ensure in-flight requests end on batcher reset Release note: none. Epic: none. --- pkg/kv/bulk/sst_batcher.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 7d11cf35ece4..fc4eba747098 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -374,7 +374,13 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value // Reset clears all state in the batcher and prepares it for reuse. func (b *SSTBatcher) Reset(ctx context.Context) { + if err := b.asyncAddSSTs.Wait(); err != nil { + log.Warningf(ctx, "closing with flushes in-progress encountered an error: %v", err) + } + b.asyncAddSSTs = ctxgroup.Group{} + b.sstWriter.Close() + b.sstFile = &storage.MemObject{} // Create sstables intended for ingestion using the newest format that all // nodes can support. MakeIngestionSSTWriter will handle cluster version