Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100667: util/bulk: fix locking order in TracingAggregator r=yuzefovich a=yuzefovich

This commit fixes an inconsistent locking order in TracingAggregator that was recently introduced in c120153. In particular, now `EventListener.Notify` is called while holding the span's mutex. However, `TracingAggregator.Close` was implemented in such a fashion that its internal mutex was before `Finish` (which acquires the span's mutex). This commit fixes this issue by getting rid of unnecessary locking in `Close` - namely, it's now required that `Close` is called exactly once, so we can just call `span.Finish`.

Fixes: cockroachdb#100562

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 5, 2023
2 parents 2f4090c + 81fd6f2 commit ff2c4a3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer

func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.agg.Close()
bp.memAcc.Close(bp.Ctx())
}
}
Expand Down
21 changes: 8 additions & 13 deletions pkg/util/bulk/tracing_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ type TracingAggregatorEvent interface {
// A TracingAggregator can be used to aggregate and render AggregatorEvents that
// are emitted as part of its tracing spans' recording.
type TracingAggregator struct {
// sp is the tracing span managed by the TracingAggregator.
sp *tracing.Span
mu struct {
syncutil.Mutex
// aggregatedEvents is a mapping from the tag identifying the
// TracingAggregatorEvent to the running aggregate of the TracingAggregatorEvent.
aggregatedEvents map[string]TracingAggregatorEvent
// sp is the tracing span managed by the TracingAggregator.
sp *tracing.Span
// closed is set to true if the TracingAggregator has already been closed.
closed bool
}
}

Expand All @@ -62,20 +60,17 @@ func (b *TracingAggregator) Notify(event tracing.Structured) tracing.EventConsum
eventTag := bulkEvent.Tag()
if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok {
b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity()
b.mu.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag])
b.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag])
}
b.mu.aggregatedEvents[eventTag].Combine(bulkEvent)
return tracing.EventNotConsumed
}

// Close is responsible for finishing the Aggregators' tracing span.
// Close is responsible for finishing the TracingAggregator's tracing span.
//
// NOTE: it must be called exactly once.
func (b *TracingAggregator) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if !b.mu.closed {
b.mu.sp.Finish()
b.mu.closed = true
}
b.sp.Finish()
}

var _ tracing.EventListener = &TracingAggregator{}
Expand All @@ -96,7 +91,7 @@ func MakeTracingAggregatorWithSpan(
agg.mu.Lock()
defer agg.mu.Unlock()
agg.mu.aggregatedEvents = make(map[string]TracingAggregatorEvent)
agg.mu.sp = aggSpan
agg.sp = aggSpan

return aggCtx, agg
}

0 comments on commit ff2c4a3

Please sign in to comment.