Skip to content

Commit

Permalink
tracing,backupccl: log aggregated CapturedStacks during backup
Browse files Browse the repository at this point in the history
This change teaches the backup processor to
periodically flush collected AggregatorEvents.
For the time being we only log the CapturedStack structured
events but this sets us up to persist other aggregated
statistics in the future.

Release note: None
Informs: cockroachdb#100126
  • Loading branch information
adityamaru committed Apr 13, 2023
1 parent 425f060 commit c6ae4fa
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 7 deletions.
48 changes: 47 additions & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,62 @@ func newBackupDataProcessor(
return bp, nil
}

// flushAggregator periodically flushes the aggregated trace events.
//
// TODO(adityamaru): This method will change in 23.2 once we start persisting
// aggregated stats in the job_info table.
func flushAggregator(ctx context.Context, agg *tracing.Aggregator) {
var flushTimer timeutil.Timer
defer flushTimer.Stop()

flush := func() {
agg.ForEachAggregatedEvent(ctx, func(tag string, aggregatorEvent tracing.AggregatorEvent) {
switch t := aggregatorEvent.(type) {
case *tracing.CapturedStack:
log.Infof(ctx, "slow request stack during backup: %s", t.String())
}
})
// Now that we have processed all the aggregated events we can clear the
// underlying map.
agg.Reset()
}

for {
// TODO(adityamaru): Make this a cluster setting.
flushTimer.Reset(30 * time.Second)
select {
case <-ctx.Done():
flush()
return
case <-flushTimer.C:
flushTimer.Read = true
flush()
}
}
}

// Start is part of the RowSource interface.
func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// bps' trace recording.
// backupDataProcessors' trace recording.
ctx, bp.agg = tracing.MakeAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer)
if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "backupDataProcessor.flushAggregator",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
ctx, cancel := bp.flowCtx.Stopper().WithCancelOnQuiesce(ctx)
defer cancel()
flushAggregator(ctx, bp.agg)
}); err != nil {
// The closure above hasn't run, so we have to do the cleanup.
bp.backupErr = err
cancel()
}

bp.cancelAndWaitForWorker = func() {
cancel()
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/tracing/tracer_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ func stackDelta(base, change string, age time.Duration) Structured {
}

var i, lines int
for i = range base {
for i = 0; i < len(base); i++ {
c := base[len(base)-1-i]
if i > len(change) || change[len(change)-1-i] != c {
if i >= len(change) || change[len(change)-1-i] != c {
break
}
if c == '\n' {
Expand All @@ -352,5 +352,6 @@ func stackDelta(base, change string, age time.Duration) Structured {
Stack: change[:len(change)-i],
SharedSuffix: int32(i),
SharedLines: int32(lines),
Age: age,
}
}
83 changes: 83 additions & 0 deletions pkg/util/tracing/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,3 +1034,86 @@ func TestTracerStackHistory(t *testing.T) {
}
}
}

func TestStackDelta(t *testing.T) {
notUsed := time.Duration(1)
for _, tc := range []struct {
name string
base string
change string
expectedStack CapturedStack
}{
{
name: "same",
base: "abcd",
change: "abcd",
expectedStack: CapturedStack{
Stack: "",
SharedSuffix: 4,
SharedLines: 0,
Age: notUsed,
},
},
{
name: "base longer",
base: "abcd",
change: "bcd",
expectedStack: CapturedStack{
Stack: "",
SharedSuffix: 3,
SharedLines: 0,
Age: notUsed,
},
},
{
name: "change longer",
base: "bcd",
change: "abcd",
expectedStack: CapturedStack{
Stack: "a",
SharedSuffix: 3,
SharedLines: 0,
Age: notUsed,
},
},
{
name: "no matching suffix",
base: "abc",
change: "abcd",
expectedStack: CapturedStack{
Stack: "abcd",
SharedSuffix: 0,
SharedLines: 0,
Age: notUsed,
},
},
{
name: "empty base",
base: "",
change: "a",
expectedStack: CapturedStack{
Stack: "a",
SharedSuffix: 0,
SharedLines: 0,
Age: notUsed,
},
},
{
name: "empty change",
base: "a",
change: "",
expectedStack: CapturedStack{
Stack: "",
SharedSuffix: 0,
SharedLines: 0,
Age: notUsed,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
s := stackDelta(tc.base, tc.change, notUsed)
stack := s.(*CapturedStack)
require.Equal(t, tc.expectedStack, *stack)
})
}
}
54 changes: 50 additions & 4 deletions pkg/util/tracing/tracing_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ package tracing

import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// AggregatorEvent describes an event that can be aggregated and stored by the
Expand All @@ -31,6 +34,16 @@ type AggregatorEvent interface {
Tag() string
}

// WrappedAggregatorEvent wraps an AggregatorEvent with the timestamp at which
// the Aggregator was notified about this event.
type WrappedAggregatorEvent struct {
AggregatorEvent

// ts is the time at which the Aggregator is notified about the
// AggregatorEvent.
ts time.Time
}

// An Aggregator can be used to aggregate and render AggregatorEvents that are
// emitted as part of its tracing spans' recording.
type Aggregator struct {
Expand All @@ -41,7 +54,7 @@ type Aggregator struct {
syncutil.Mutex
// aggregatedEvents is a mapping from the tag identifying the
// AggregatorEvent to the running aggregate of the AggregatorEvent.
aggregatedEvents map[string]AggregatorEvent
aggregatedEvents map[string]*WrappedAggregatorEvent
}
}

Expand All @@ -59,10 +72,13 @@ func (b *Aggregator) Notify(event Structured) EventConsumptionStatus {
// the associated tracing span.
eventTag := bulkEvent.Tag()
if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok {
b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity()
b.mu.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag])
b.mu.aggregatedEvents[eventTag] = &WrappedAggregatorEvent{
AggregatorEvent: bulkEvent.Identity(),
}
b.sp.SetLazyTagLocked(eventTag, b.mu.aggregatedEvents[eventTag].AggregatorEvent)
}
b.mu.aggregatedEvents[eventTag].Combine(bulkEvent)
b.mu.aggregatedEvents[eventTag].ts = timeutil.Now()
return EventNotConsumed
}

Expand All @@ -73,6 +89,36 @@ func (b *Aggregator) Close() {
b.sp.Finish()
}

// Reset resets the Aggregator state.
func (b *Aggregator) Reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.mu.aggregatedEvents = make(map[string]*WrappedAggregatorEvent)
}

// ForEachAggregatedEvent loops over the Aggregator's current aggregated events.
// The events are looped over in chronological order of when the Aggregator was
// notified about them.
func (b *Aggregator) ForEachAggregatedEvent(
_ context.Context, f func(tag string, aggregatorEvent AggregatorEvent),
) {
b.mu.Lock()
defer b.mu.Unlock()
keys := make([]string, len(b.mu.aggregatedEvents))
i := 0
for tag := range b.mu.aggregatedEvents {
keys[i] = tag
i++
}
sort.SliceStable(keys, func(i, j int) bool {
return b.mu.aggregatedEvents[keys[i]].ts.Before(
b.mu.aggregatedEvents[keys[j]].ts)
})
for _, k := range keys {
f(k, b.mu.aggregatedEvents[k].AggregatorEvent)
}
}

var _ EventListener = &Aggregator{}

// MakeAggregatorWithSpan returns an instance of an Aggregator along with a
Expand All @@ -90,7 +136,7 @@ func MakeAggregatorWithSpan(

agg.mu.Lock()
defer agg.mu.Unlock()
agg.mu.aggregatedEvents = make(map[string]AggregatorEvent)
agg.mu.aggregatedEvents = make(map[string]*WrappedAggregatorEvent)
agg.sp = aggSpan

return aggCtx, agg
Expand Down

0 comments on commit c6ae4fa

Please sign in to comment.