Skip to content

Commit

Permalink
[wip] tracing: centralize storage of structured events
Browse files Browse the repository at this point in the history
I think it'll make addressing questions like #59145 easier. By
disaggregating storage from the spans themselves, questions around
"consuming" tracing events are a bit simpler to reason about. I think
we'll want to do something like this anyway if we want to cap the total
memory usage of structured events across active spans.

Release note: None
  • Loading branch information
irfansharif committed Jan 22, 2021
1 parent e0cfe98 commit 8a2086f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,7 @@ func WithAnonymizedStatement(err error, stmt tree.Statement) error {
//
// SessionTracing and its interactions with the connExecutor are thread-safe;
// tracing can be turned on at any time.
// XXX: Study this guy.
type SessionTracing struct {
// enabled is set at times when "session enabled" is active - i.e. when
// transactions are being recorded.
Expand Down
58 changes: 46 additions & 12 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
// crdbSpan is a span for internal crdb usage. This is used to power SQL session
// tracing.
type crdbSpan struct {
// XXX: I think we want a handle on the tracer object. Or an interface to
// it, for the "structured event store".
tracer *Tracer
bypassRegistry bool

traceID uint64 // probabilistically unique.
spanID uint64 // probabilistically unique.
parentSpanID uint64
Expand Down Expand Up @@ -77,7 +82,7 @@ type crdbSpanMu struct {
tags opentracing.Tags

stats SpanStats
structured []Structured
structured []Structured // XXX: This moves into the tracer registry.

// The Span's associated baggage.
baggage map[string]string
Expand Down Expand Up @@ -163,6 +168,13 @@ func (s *crdbSpan) getRecording(m mode) Recording {
return result
}

// XXX: Would be nice to dump these into our store too. But for now we've
// limited ourselves to only structured events. Perhaps this isn't necessary,
// yet. I was just thinking of using all these in-memory objects as "shells"
// that only contain the metadata, nothing heavier than that. This way they
// could be fixed in size, and predictable. But we're also not looking to change
// out how "logging" data is stored (which is in-memory), the same as remote
// spans.
func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error {
// Change the root of the remote recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
Expand Down Expand Up @@ -200,9 +212,19 @@ func (s *crdbSpan) record(msg string) {
}

func (s *crdbSpan) recordStructured(item Structured) {
if s.bypassRegistry {
return
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.structured = append(s.mu.structured, item)
// s.mu.structured = append(s.mu.structured, item)

// XXX: We're recording into the tracer object. Probably should expose a
// method instead of the raw form here.
s.tracer.activeStructuredEvents.Lock()
s.tracer.activeStructuredEvents.m[s.spanID] = append(s.tracer.activeStructuredEvents.m[s.spanID], item)
s.tracer.activeStructuredEvents.Unlock()
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
Expand Down Expand Up @@ -271,17 +293,29 @@ func (s *crdbSpan) getRecordingLocked(m mode) tracingpb.RecordedSpan {
rs.DeprecatedStats = stats
}

if s.mu.structured != nil {
rs.InternalStructured = make([]*types.Any, 0, len(s.mu.structured))
for i := range s.mu.structured {
item, err := types.MarshalAny(s.mu.structured[i])
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
continue
}
rs.InternalStructured = append(rs.InternalStructured, item)
// if s.mu.structured != nil {
// rs.InternalStructured = make([]*types.Any, 0, len(s.mu.structured))
// for i := range s.mu.structured {
// item, err := types.MarshalAny(s.mu.structured[i])
// if err != nil {
// // An error here is an error from Marshal; these
// // are unlikely to happen.
// continue
// }
// rs.InternalStructured = append(rs.InternalStructured, item)
// }
// }

// XXX: We're grabbing these from the tracer "store".
se := s.tracer.getStructuredEvents(s.spanID)
for i := range se {
item, err := types.MarshalAny(se[i])
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
continue
}
rs.InternalStructured = append(rs.InternalStructured, item)
}

if len(s.mu.baggage) > 0 {
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (s *Span) ResetRecording() {
s.crdb.mu.recording.children = nil
s.crdb.mu.recording.remoteSpans = nil
s.crdb.mu.Unlock()

s.tracer.activeStructuredEvents.Lock()
delete(s.tracer.activeStructuredEvents.m, s.crdb.spanID)
s.tracer.activeStructuredEvents.Unlock()
}

// GetRecording retrieves the current recording, if the Span has recording
Expand Down Expand Up @@ -243,6 +247,9 @@ func (s *Span) Finish() {
s.tracer.activeSpans.Lock()
delete(s.tracer.activeSpans.m, s)
s.tracer.activeSpans.Unlock()
s.tracer.activeStructuredEvents.Lock()
delete(s.tracer.activeStructuredEvents.m, s.crdb.spanID)
s.tracer.activeStructuredEvents.Unlock()
}

// Meta returns the information which needs to be propagated across process
Expand Down
40 changes: 34 additions & 6 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ type Tracer struct {
syncutil.Mutex
m map[*Span]struct{}
}

// XXX: Only records structured events
activeStructuredEvents struct {
syncutil.Mutex
m map[uint64][]Structured // spanID
}
}

// NewTracer creates a Tracer. It initially tries to run with minimal overhead
Expand All @@ -167,6 +173,7 @@ type Tracer struct {
func NewTracer() *Tracer {
t := &Tracer{}
t.activeSpans.m = map[*Span]struct{}{}
t.activeStructuredEvents.m = map[uint64][]Structured{}
t.noopSpan = &Span{tracer: t}
return t
}
Expand Down Expand Up @@ -374,12 +381,14 @@ func (t *Tracer) startSpanGeneric(
}{}

helper.crdbSpan = crdbSpan{
traceID: traceID,
spanID: spanID,
operation: opName,
startTime: startTime,
parentSpanID: opts.parentSpanID(),
logTags: opts.LogTags,
tracer: t,
bypassRegistry: opts.BypassRegistry,
traceID: traceID,
spanID: spanID,
operation: opName,
startTime: startTime,
parentSpanID: opts.parentSpanID(),
logTags: opts.LogTags,
mu: crdbSpanMu{
duration: -1, // unfinished
},
Expand Down Expand Up @@ -614,6 +623,25 @@ func (t *Tracer) VisitSpans(visitor func(*Span)) {
}
}

// XXX: What does the span visitor look like with a tracer centered storage?
// This is what would feed into the vtable. Perhaps scan the map for the list of
// span IDs first? forms the "structured event storage" interface. So trace
// gives you a way to iterate over all the spans. Spans expose a way to get its
// recordings, which is coming back here. As for "consuming it", you can now
// iterate + consume it from here directly? Probably need another kind of
// iterator. But it avoids the hairy questions of consuming from spans, while
// still iterating, and what the lifecycle of the span looks like? Right now the
// verbose "message" recordings are part of the span itself. This is for just
// the structured events.
func (t *Tracer) getStructuredEvents(spanID uint64) []Structured {
t.activeStructuredEvents.Lock()
se := make([]Structured, 0, len(t.activeStructuredEvents.m[spanID]))
se = append(se, t.activeStructuredEvents.m[spanID]...)
t.activeStructuredEvents.Unlock()

return se
}

// ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span
// that "follows from" the original Span. This allows the resulting context to be
// used in an async task that might outlive the original operation.
Expand Down

0 comments on commit 8a2086f

Please sign in to comment.