Skip to content

Commit

Permalink
tracing: aggregate OperationMetadata on span Finish()
Browse files Browse the repository at this point in the history
This change adds a `ChildrenMetadata` map to `crdbspan` that is a
mapping from operation to the operations' aggregated metadata. This
map is updated whenever a child of the `crdbSpan` finishes, with metadata
from all spans in the finishing childs' Recording. The map is therefore
a bucketed view of all the operations being traced by a span.

The motivation for this change is to surface more metadata about the
suboperations being traced in a spans' Recording. This could in turn provide
more o11y into why a job is slow/stuck, or where the performance of a
distributed operation is bottlenecked.

As part of a span Finish()ing, the span fetches its Recording with the spans'
configured verbosity. Prior to this change the recording would then be
processed as follows:

*Verbose Recording*

In the case of Verbose recording the spans in the recording are added to the parents'
`finishedChildren` slice provided we have not exceeded the maximum number of
children a parent can track.

*Structured Recording*

In the case of a Structured recording, only the StructuredEvents from the spans in
the recording are copied into the parent.

With this change, in both the Verbose and Structured recording mode, a finishing span
is also responsible for rolling up the OperationMetadata of all the spans in its
recording. This involves updating the parents' `childrenMetadata` mapping with:

1) an entry for the finishing span.
2) an entry for each of the finishing spans' Finish()ed children.
3) an entry for each of the finishing spans' open children, and their children recursively

The logic for 2) and 3) is subsumed in the method responsible for getting
the finishing spans' recording. Notably, GetRecording(...) for both Structured and Verbose
recordings, populate the root of the recording with OperationMetadata of all
finished and open children in the recording.

As an example when we are done finishing `child`:

```
parent
  child
    grandchild
```

We'd expect `parent` to have:
`{child: 2s, grandchild: 1s}`

Given that Finish()ing a child, and importing a remote recording into a span
share the same code path, the above semantics also apply to a remote recording
being imported into a parent span.

Fixes: #80391

Release note: None
  • Loading branch information
adityamaru committed Jun 27, 2022
1 parent 31ed92f commit 4ddc350
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 75 deletions.
1 change: 1 addition & 0 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func TestRedactRecordingForTenant(t *testing.T) {
GoroutineID uint64
Finished bool
StructuredRecords []tracingpb.StructuredRecord
ChildrenMetadata map[string]tracingpb.OperationMetadata
}
_ = (*calcifiedRecordedSpan)((*tracingpb.RecordedSpan)(nil))
})
Expand Down
237 changes: 166 additions & 71 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ type recordingState struct {
//
// The spans are not maintained in a particular order.
finishedChildren []tracingpb.RecordedSpan

// childrenMetadata is a mapping from operation to the aggregated metadata of
// that operation.
//
// When a child of this span is Finish()ed, it updates the map with all the
// children in its Recording. childrenMetadata therefore provides a bucketed
// view of the various operations that are being traced as part of a span.
childrenMetadata map[string]tracingpb.OperationMetadata
}

// makeSizeLimitedBuffer creates a sizeLimitedBuffer.
Expand Down Expand Up @@ -377,8 +385,20 @@ func (s *crdbSpan) getRecordingImpl(
}
}

// rollupChildrenMetadata combines the OperationMetadata in `from` into `to`.
func rollupChildrenMetadata(
to map[string]tracingpb.OperationMetadata, from map[string]tracingpb.OperationMetadata,
) {
for op, metadata := range from {
to[op] = to[op].Combine(metadata)
}
}

// getVerboseRecording returns the Span's recording, including its children.
//
// Each RecordedSpan in the Recording contains the ChildrenMetadata of all the
// children, both finished and open, in the spans' subtree.
//
// finishing indicates whether s is in the process of finishing. If it isn't,
// the recording will include an "_unfinished" tag.
func (s *crdbSpan) getVerboseRecording(
Expand All @@ -388,21 +408,53 @@ func (s *crdbSpan) getVerboseRecording(
return nil // noop span
}

s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result := make(tracingpb.Recording, 0, 1+len(s.mu.openChildren)+len(s.mu.recording.finishedChildren))
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

for _, child := range s.mu.openChildren {
if child.collectRecording || includeDetachedChildren {
sp := child.Span.i.crdb
result = append(result, sp.getVerboseRecording(includeDetachedChildren, false /* finishing */)...)
var result tracingpb.Recording
var childrenMetadata map[string]tracingpb.OperationMetadata
{
s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result = make(tracingpb.Recording, 0, 1+len(s.mu.openChildren)+len(s.mu.recording.finishedChildren))
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

childrenMetadata = make(map[string]tracingpb.OperationMetadata)

// Copy over the OperationMetadata collected from s' finished children.
rollupChildrenMetadata(childrenMetadata, s.mu.recording.childrenMetadata)

// We recurse on s' open children to get their verbose recordings, and to
// aggregate OperationMetadata from their children, both finished and open.
for _, openChild := range s.mu.openChildren {
if openChild.collectRecording || includeDetachedChildren {
openChildSp := openChild.Span.i.crdb
openChildRecording := openChildSp.getVerboseRecording(includeDetachedChildren, false /* finishing */)
result = append(result, openChildRecording...)

// Record an entry for openChilds' OperationMetadata.
rootOpenChild := openChildRecording[0]
prevMetadata := childrenMetadata[rootOpenChild.Operation]
prevMetadata.Count++
prevMetadata.ContainsUnfinished = !rootOpenChild.Finished
prevMetadata.Duration += rootOpenChild.Duration
childrenMetadata[rootOpenChild.Operation] = prevMetadata

// Copy over the OperationMetadata collected recursively from openChilds'
// children.
rollupChildrenMetadata(childrenMetadata, openChildRecording[0].ChildrenMetadata)
}
}
}

s.mu.Unlock()

// Copy over the OperationMetadata collected from s' children into the root of
// the recording.
if len(childrenMetadata) != 0 {
rootSpan := &result[0]
rootSpan.ChildrenMetadata = childrenMetadata
}

// Sort the spans by StartTime, except the first Span (the root of this
// recording) which stays in place.
toSort := sortPool.Get().(*tracingpb.Recording) // avoids allocations in sort.Sort
Expand All @@ -413,59 +465,35 @@ func (s *crdbSpan) getVerboseRecording(
return result
}

// getStructuredRecording returns the structured events in this span and
// in all the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will be
// nil if there are no structured events. If not nil, the Recording will have
// exactly one span corresponding to the receiver, will all events handing from
// getStructuredRecording returns the structured events in this span and in all
// the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will have
// exactly one span corresponding to the receiver, with all events hanging from
// this span (even if the events had been recorded on different spans).
// This span will also have a `childrenMetadata` map that will contain an entry
// for all children in s' Recording.
//
// The caller does not take ownership of the events.
func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) tracingpb.Recording {
s.mu.Lock()
defer s.mu.Unlock()
buffer := make([]*tracingpb.StructuredRecord, 0, 3)
for _, c := range s.mu.recording.finishedChildren {
for i := range c.StructuredRecords {
buffer = append(buffer, &c.StructuredRecords[i])
}
}
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
}
}

if len(buffer) == 0 && s.mu.recording.structured.Len() == 0 {
// Optimize out the allocations below.
return nil
}

res := s.getRecordingNoChildrenLocked(
tracingpb.RecordingStructured,
false, // finishing - since we're only asking for the structured recording, the argument doesn't matter
)
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]
}
for i, e := range buffer {
reservedSpace[i] = *e
}

// Recursively fetch the StructuredEvents for s' and its children, both
// finished and open.
res.StructuredRecords = s.getStructuredEventsRecursivelyLocked(res.StructuredRecords[:0],
includeDetachedChildren)

// Recursively fetch the OperationMetadata for s' children, both finished and
// open.
res.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
s.getChildrenMetadataRecursivelyLocked(res.ChildrenMetadata,
false /* includeRootMetadata */, includeDetachedChildren)

return tracingpb.Recording{res}
}

Expand Down Expand Up @@ -497,6 +525,8 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
return
}

rootChild := &childRecording[0]

// Depending on the type of recording, we either keep all the information
// received, or only the structured events.
switch s.recordingType() {
Expand All @@ -505,7 +535,7 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRecording[0].ParentSpanID = s.spanID
rootChild.ParentSpanID = s.spanID

if len(s.mu.recording.finishedChildren)+len(childRecording) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, childRecording...)
Expand All @@ -516,17 +546,44 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// records by falling through.
fallthrough
case tracingpb.RecordingStructured:
for ci := range childRecording {
child := &childRecording[ci]
for i := range child.StructuredRecords {
s.recordInternalLocked(&child.StructuredRecords[i], &s.mu.recording.structured)
}
if len(childRecording) != 1 {
panic(fmt.Sprintf("RecordingStructured has %d recordings; expected 1", len(childRecording)))
}

for i := range rootChild.StructuredRecords {
s.recordInternalLocked(&rootChild.StructuredRecords[i], &s.mu.recording.structured)
}
case tracingpb.RecordingOff:
break
default:
panic(fmt.Sprintf("unrecognized recording mode: %v", s.recordingType()))
}

// Update s' ChildrenMetadata to capture all the spans in `childRecording`.
//
// As an example where we are done finishing `child`:
//
// parent
// child
// grandchild
//
// `parent` will have:
// {child: 2s, grandchild: 1s}
//
// Record finished rootChilds' metadata.
s.mu.recording.childrenMetadata[rootChild.Operation] =
s.mu.recording.childrenMetadata[rootChild.Operation].Combine(
tracingpb.OperationMetadata{
Count: 1,
Duration: rootChild.Duration,
ContainsUnfinished: false,
})

// Record the metadata of rootChilds' children, both finished and open.
//
// GetRecording(...) is responsible for recursively capturing the metadata for
// rootChilds' open and finished children.
rollupChildrenMetadata(s.mu.recording.childrenMetadata, rootChild.ChildrenMetadata)
}

func (s *crdbSpan) setTagLocked(key string, value attribute.Value) {
Expand Down Expand Up @@ -698,35 +755,73 @@ func (s *crdbSpan) recordInternalLocked(payload memorySizable, buffer *sizeLimit
buffer.AddLast(payload)
}

// getStructuredEventsRecursively returns the structured events accumulated by
// this span and its finished and still-open children.
func (s *crdbSpan) getStructuredEventsRecursively(
buffer []*tracingpb.StructuredRecord, includeDetachedChildren bool,
) []*tracingpb.StructuredRecord {
s.mu.Lock()
defer s.mu.Unlock()
// getStructuredEventsRecursivelyLocked returns the structured events
// accumulated by s' and its finished and still-open children.
func (s *crdbSpan) getStructuredEventsRecursivelyLocked(
buffer []tracingpb.StructuredRecord, includeDetachedChildren bool,
) []tracingpb.StructuredRecord {
buffer = s.getStructuredEventsLocked(buffer)
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
sp.mu.Lock()
buffer = sp.getStructuredEventsRecursivelyLocked(buffer, includeDetachedChildren)
sp.mu.Unlock()
}
}
for _, c := range s.mu.recording.finishedChildren {
for i := range c.StructuredRecords {
buffer = append(buffer, &c.StructuredRecords[i])
buffer = append(buffer, c.StructuredRecords[i])
}
}
return buffer
}

// getChildrenMetadataRecursivelyLocked populates `childrenMetadata` with
// OperationMetadata entries for all of s' children (open and finished),
// recursively.
//
// The method also populates `childrenMetadata` with an entry for the receiver
// if `includeRootMetadata` is true.
func (s *crdbSpan) getChildrenMetadataRecursivelyLocked(
childrenMetadata map[string]tracingpb.OperationMetadata,
includeRootMetadata, includeDetachedChildren bool,
) {
if includeRootMetadata {
// Record an entry for s' metadata.
prevMetadata := childrenMetadata[s.operation]
prevMetadata.Count++
if s.mu.duration == -1 {
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
} else {
prevMetadata.Duration += s.mu.duration
}
childrenMetadata[s.operation] = prevMetadata
}

// Copy over s' Finish()ed children metadata.
rollupChildrenMetadata(childrenMetadata, s.mu.recording.childrenMetadata)

// For each of s' open children, recurse to collect their metadata.
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
sp.mu.Lock()
sp.getChildrenMetadataRecursivelyLocked(childrenMetadata,
true /*includeRootMetadata */, includeDetachedChildren)
sp.mu.Unlock()
}
}
}

func (s *crdbSpan) getStructuredEventsLocked(
buffer []*tracingpb.StructuredRecord,
) []*tracingpb.StructuredRecord {
buffer []tracingpb.StructuredRecord,
) []tracingpb.StructuredRecord {
numEvents := s.mu.recording.structured.Len()
for i := 0; i < numEvents; i++ {
event := s.mu.recording.structured.Get(i).(*tracingpb.StructuredRecord)
buffer = append(buffer, event)
buffer = append(buffer, *event)
}
return buffer
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,9 @@ func (sp *Span) reset(
openChildren: h.childrenAlloc[:0],
goroutineID: goroutineID,
recording: recordingState{
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
childrenMetadata: make(map[string]tracingpb.OperationMetadata),
},
tags: h.tagsAlloc[:0],
}
Expand Down
Loading

0 comments on commit 4ddc350

Please sign in to comment.