diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 591b5a3930a7..35d4cdc5fc86 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -217,8 +217,8 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() - bp.agg.Close() if bp.InternalClose() { + bp.agg.Close() bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/util/bulk/tracing_aggregator.go b/pkg/util/bulk/tracing_aggregator.go index c5b3aa5c7d94..4fe8eaeaad1c 100644 --- a/pkg/util/bulk/tracing_aggregator.go +++ b/pkg/util/bulk/tracing_aggregator.go @@ -35,15 +35,13 @@ type TracingAggregatorEvent interface { // A TracingAggregator can be used to aggregate and render AggregatorEvents that // are emitted as part of its tracing spans' recording. type TracingAggregator struct { + // sp is the tracing span managed by the TracingAggregator. + sp *tracing.Span mu struct { syncutil.Mutex // aggregatedEvents is a mapping from the tag identifying the // TracingAggregatorEvent to the running aggregate of the TracingAggregatorEvent. aggregatedEvents map[string]TracingAggregatorEvent - // sp is the tracing span managed by the TracingAggregator. - sp *tracing.Span - // closed is set to true if the TracingAggregator has already been closed. - closed bool } } @@ -62,20 +60,17 @@ func (b *TracingAggregator) Notify(event tracing.Structured) tracing.EventConsum eventTag := bulkEvent.Tag() if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity() - b.mu.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag]) + b.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag]) } b.mu.aggregatedEvents[eventTag].Combine(bulkEvent) return tracing.EventNotConsumed } -// Close is responsible for finishing the Aggregators' tracing span. +// Close is responsible for finishing the TracingAggregator's tracing span. +// +// NOTE: it must be called exactly once. func (b *TracingAggregator) Close() { - b.mu.Lock() - defer b.mu.Unlock() - if !b.mu.closed { - b.mu.sp.Finish() - b.mu.closed = true - } + b.sp.Finish() } var _ tracing.EventListener = &TracingAggregator{} @@ -96,7 +91,7 @@ func MakeTracingAggregatorWithSpan( agg.mu.Lock() defer agg.mu.Unlock() agg.mu.aggregatedEvents = make(map[string]TracingAggregatorEvent) - agg.mu.sp = aggSpan + agg.sp = aggSpan return aggCtx, agg }