Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prototype] tracing: centralize storage of structured events #59310

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,11 +689,11 @@ func (h *contentionEventHelper) emit() {
}
h.ev.Duration = timeutil.Since(h.tBegin)
if h.onEvent != nil {
// NB: this is intentionally above the call to LogStructured so that
// NB: this is intentionally above the call to RecordStructured so that
// this interceptor gets to mutate the event (used for test determinism).
h.onEvent(h.ev)
}
h.sp.LogStructured(h.ev)
h.sp.RecordStructured(h.ev)
h.ev = nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,7 @@ func WithAnonymizedStatement(err error, stmt tree.Statement) error {
//
// SessionTracing and its interactions with the connExecutor are thread-safe;
// tracing can be turned on at any time.
// XXX: Study this guy.
type SessionTracing struct {
// enabled is set at times when "session enabled" is active - i.e. when
// transactions are being recorded.
Expand Down Expand Up @@ -1552,6 +1553,7 @@ func (st *SessionTracing) StartTracing(
if sp == nil {
return errors.Errorf("no txn span for SessionTracing")
}
sp.ResetRecording()
sp.SetVerbose(true)
st.firstTxnSpan = sp
}
Expand Down
66 changes: 47 additions & 19 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
// crdbSpan is a span for internal crdb usage. This is used to power SQL session
// tracing.
type crdbSpan struct {
// XXX: I think we want a handle on the tracer object. Or an interface to
// it, for the "structured event store".
tracer *Tracer
bypassRegistry bool

traceID uint64 // probabilistically unique.
spanID uint64 // probabilistically unique.
parentSpanID uint64
Expand Down Expand Up @@ -77,7 +82,7 @@ type crdbSpanMu struct {
tags opentracing.Tags

stats SpanStats
structured []Structured
structured []Structured // XXX: This moves into the tracer registry.

// The Span's associated baggage.
baggage map[string]string
Expand Down Expand Up @@ -111,12 +116,6 @@ func (s *crdbSpan) enableRecording(parent *crdbSpan, recType RecordingType) {
if recType == RecordingVerbose {
s.setBaggageItemLocked(verboseTracingBaggageKey, "1")
}
// Clear any previously recorded info. This is needed by SQL SessionTracing,
// who likes to start and stop recording repeatedly on the same Span, and
// collect the (separate) recordings every time.
s.mu.recording.recordedLogs = nil
s.mu.recording.children = nil
s.mu.recording.remoteSpans = nil
}

func (s *crdbSpan) disableRecording() {
Expand Down Expand Up @@ -169,6 +168,13 @@ func (s *crdbSpan) getRecording(m mode) Recording {
return result
}

// XXX: Would be nice to dump these into our store too. But for now we've
// limited ourselves to only structured events. Perhaps this isn't necessary,
// yet. I was just thinking of using all these in-memory objects as "shells"
// that only contain the metadata, nothing heavier than that. This way they
// could be fixed in size, and predictable. But we're also not looking to change
// out how "logging" data is stored (which is in-memory), the same as remote
// spans.
func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error {
// Change the root of the remote recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
Expand Down Expand Up @@ -205,10 +211,20 @@ func (s *crdbSpan) record(msg string) {
}
}

func (s *crdbSpan) logStructured(item Structured) {
func (s *crdbSpan) recordStructured(item Structured) {
if s.bypassRegistry {
return
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.structured = append(s.mu.structured, item)
// s.mu.structured = append(s.mu.structured, item)

// XXX: We're recording into the tracer object. Probably should expose a
// method instead of the raw form here.
s.tracer.activeStructuredEvents.Lock()
s.tracer.activeStructuredEvents.m[s.spanID] = append(s.tracer.activeStructuredEvents.m[s.spanID], item)
s.tracer.activeStructuredEvents.Unlock()
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
Expand Down Expand Up @@ -277,17 +293,29 @@ func (s *crdbSpan) getRecordingLocked(m mode) tracingpb.RecordedSpan {
rs.DeprecatedStats = stats
}

if s.mu.structured != nil {
rs.InternalStructured = make([]*types.Any, 0, len(s.mu.structured))
for i := range s.mu.structured {
item, err := types.MarshalAny(s.mu.structured[i])
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
continue
}
rs.InternalStructured = append(rs.InternalStructured, item)
// if s.mu.structured != nil {
// rs.InternalStructured = make([]*types.Any, 0, len(s.mu.structured))
// for i := range s.mu.structured {
// item, err := types.MarshalAny(s.mu.structured[i])
// if err != nil {
// // An error here is an error from Marshal; these
// // are unlikely to happen.
// continue
// }
// rs.InternalStructured = append(rs.InternalStructured, item)
// }
// }

// XXX: We're grabbing these from the tracer "store".
se := s.tracer.getStructuredEvents(s.spanID)
for i := range se {
item, err := types.MarshalAny(se[i])
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
continue
}
rs.InternalStructured = append(rs.InternalStructured, item)
}

if len(s.mu.baggage) > 0 {
Expand Down
17 changes: 14 additions & 3 deletions pkg/util/tracing/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,20 @@
// [5]: `ForkCtxSpan`. "forking" a Span is the same as creating a new one
// with a "follows from" relation.
// [6]: `crdbSpan`
// [7]: `Span.SetVerbose`. To understand the specifics of what exactly is
// captured in Span recording, when Spans have children that may be either
// local or remote, look towards `WithParentAnd{Auto,Manual}Collection`
// [7]: `Span.SetVerbose`. To understand the specifics of what's captured in
// Span recordings, when Spans have children that may be either local or
// remote, look towards `WithParentAnd{Auto,Manual}Collection`.
// The complexity here stems from how we expect parentSpan.GetRecording()
// to behave. If all child spans are created using the AutoCollection
// option, grabbing the parent span's recording automatically captures all
// child span recordings. This is easy to implement, and doesn't have to
// leak outside the tracing package.
// For operations that tread RPC boundaries however, we cannot simply
// retrieve remote child recordings ourselves. To that end we provide
// the ManualCollection option, where the caller is responsible for
// capturing the remote child span's recording (`span.GetRecording`),
// transferring it over the wire, and importing it into the parent span
// (`ImportRemoteSpans`).
// [8]: `Tracer.{Inject,Extract}`
// [9]: `SpanMeta`
// [10]: `{Client,Server}Interceptor`
Expand Down
61 changes: 41 additions & 20 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,33 +118,50 @@ func (s *Span) IsVerbose() bool {
// descendants of this Span will do so automatically as well. This does not
// apply to past derived Spans, which may in fact be noop spans.
//
// As a side effect, calls to `SetVerbose(true)` on a span that was not already
// verbose will reset any past recording stored on this Span.
//
// When set to 'false', Record will cede to add data to the recording (though
// they may still be collected, should the Span have been set up with an
// auxiliary trace sink). This does not apply to Spans derived from this one
// when it was verbose.
func (s *Span) SetVerbose(to bool) {
// TODO(tbg): when always-on tracing is firmly established, we can remove the ugly
// caveat that SetVerbose(true) is a panic on a noop span because there will be no
// noop span.
// TODO(tbg): when always-on tracing is firmly established, we can remove
// the ugly caveat that SetVerbose(true) is a panic on a noop span because
// there will be no noop span.
if s.isNoop() {
panic(errors.AssertionFailedf("SetVerbose called on NoopSpan; use the WithForceRealSpan option for StartSpan"))
}
if to {
// If we're already recording (perhaps because the parent was recording when
// this Span was created), there's nothing to do. Avoid the call to enableRecording
// because it would clear the existing recording.
recType := RecordingVerbose
if recType != s.crdb.recordingType() {
s.crdb.enableRecording(nil /* parent */, recType)
if s.crdb.recordingType() != RecordingVerbose {
s.crdb.enableRecording(nil /* parent */, RecordingVerbose)
}
} else {
s.crdb.disableRecording()
}
}

// ResetRecording will reset any past recording stored on this Span.
func (s *Span) ResetRecording() {
// TODO(irfansharif): Once always-on tracing is the only mode available,
// we'll no longer have noop spans, and we can remove this caveat.
if s.isNoop() {
panic(errors.AssertionFailedf("ResetRecording called on NoopSpan; use the WithForceRealSpan option for StartSpan"))
}
// Clear any previously recorded info. This is needed by SQL SessionTracing,
// who likes to start and stop recording repeatedly on the same Span, and
// collect the (separate) recordings every time.
s.crdb.mu.Lock()
s.crdb.mu.recording.recordedLogs = nil
s.crdb.mu.recording.children = nil
s.crdb.mu.recording.remoteSpans = nil
s.crdb.mu.Unlock()

s.tracer.activeStructuredEvents.Lock()
delete(s.tracer.activeStructuredEvents.m, s.crdb.spanID)
s.tracer.activeStructuredEvents.Unlock()
}

// GetRecording retrieves the current recording, if the Span has recording
// enabled. This can be called while spans that are part of the recording are
// still open; it can run concurrently with operations on those spans.
Expand Down Expand Up @@ -193,14 +210,14 @@ type SpanStats interface {
// SetSpanStats sets the stats on a Span. stats.Stats() will also be added to
// the Span tags.
//
// This is deprecated. Use LogStructured instead.
// This is deprecated. Use RecordStructured instead.
//
// TODO(tbg): remove this in the 21.2 cycle.
func (s *Span) SetSpanStats(stats SpanStats) {
if s.isNoop() {
return
}
s.LogStructured(stats)
s.RecordStructured(stats)
s.crdb.mu.Lock()
s.crdb.mu.stats = stats
for name, value := range stats.StatsTags() {
Expand Down Expand Up @@ -230,6 +247,9 @@ func (s *Span) Finish() {
s.tracer.activeSpans.Lock()
delete(s.tracer.activeSpans.m, s)
s.tracer.activeSpans.Unlock()
s.tracer.activeStructuredEvents.Lock()
delete(s.tracer.activeStructuredEvents.m, s.crdb.spanID)
s.tracer.activeStructuredEvents.Unlock()
}

// Meta returns the information which needs to be propagated across process
Expand Down Expand Up @@ -320,22 +340,23 @@ func (s *Span) setTagInner(key string, value interface{}, locked bool) *Span {
}

// Structured is an opaque protobuf that can be attached to a trace via
// `Span.LogStructured`. This is the only kind of data a Span carries when
// `Span.RecordStructured`. This is the only kind of data a Span carries when
// `trace.mode = background`.
type Structured interface {
protoutil.Message
}

// LogStructured adds a Structured payload to the Span. It will be added to the
// recording even if the Span is not verbose; however it will be discarded if
// the underlying Span has been optimized out (i.e. is a noop span).
// RecordStructured provides a way to record a Structured payload into the Span.
// It will be added to the recording even if the Span is not verbose; however it
// will be discarded if the underlying Span has been optimized out (i.e. is a
// noop span).
//
// The caller must not mutate the item once LogStructured has been called.
func (s *Span) LogStructured(item Structured) {
// The caller must not mutate the item once RecordStructured has been called.
func (s *Span) RecordStructured(item Structured) {
if s.isNoop() {
return
}
s.crdb.logStructured(item)
s.crdb.recordStructured(item)
if s.hasVerboseSink() {
// NB: TrimSpace avoids the trailing whitespace
// generated by the protobuf stringers.
Expand All @@ -346,8 +367,8 @@ func (s *Span) LogStructured(item Structured) {
// Record provides a way to record free-form text into verbose spans.
//
// TODO(irfansharif): We don't currently have redactability with trace
// recordings (both here, and using LogStructured above). We'll want to do this
// soon.
// recordings (both here, and using RecordStructured above). We'll want to do
// this soon.
func (s *Span) Record(msg string) {
if !s.hasVerboseSink() {
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestSpan_LogStructured(t *testing.T) {
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()

sp.LogStructured(&types.Int32Value{Value: 4})
sp.RecordStructured(&types.Int32Value{Value: 4})
rec := sp.GetRecording()
require.Len(t, rec, 1)
require.Len(t, rec[0].InternalStructured, 1)
Expand All @@ -206,7 +206,7 @@ func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) {
defer ch.Finish()
require.Len(t, sp.crdb.mu.recording.children, 1)
require.Equal(t, ch.crdb, sp.crdb.mu.recording.children[0])
ch.LogStructured(&types.Int32Value{Value: 5})
ch.RecordStructured(&types.Int32Value{Value: 5})
// Check that the child span (incl its payload) is in the recording.
rec := sp.GetRecording()
require.Len(t, rec, 2)
Expand Down
40 changes: 34 additions & 6 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ type Tracer struct {
syncutil.Mutex
m map[*Span]struct{}
}

// XXX: Only records structured events
activeStructuredEvents struct {
syncutil.Mutex
m map[uint64][]Structured // spanID
}
}

// NewTracer creates a Tracer. It initially tries to run with minimal overhead
Expand All @@ -167,6 +173,7 @@ type Tracer struct {
func NewTracer() *Tracer {
t := &Tracer{}
t.activeSpans.m = map[*Span]struct{}{}
t.activeStructuredEvents.m = map[uint64][]Structured{}
t.noopSpan = &Span{tracer: t}
return t
}
Expand Down Expand Up @@ -374,12 +381,14 @@ func (t *Tracer) startSpanGeneric(
}{}

helper.crdbSpan = crdbSpan{
traceID: traceID,
spanID: spanID,
operation: opName,
startTime: startTime,
parentSpanID: opts.parentSpanID(),
logTags: opts.LogTags,
tracer: t,
bypassRegistry: opts.BypassRegistry,
traceID: traceID,
spanID: spanID,
operation: opName,
startTime: startTime,
parentSpanID: opts.parentSpanID(),
logTags: opts.LogTags,
mu: crdbSpanMu{
duration: -1, // unfinished
},
Expand Down Expand Up @@ -614,6 +623,25 @@ func (t *Tracer) VisitSpans(visitor func(*Span)) {
}
}

// XXX: What does the span visitor look like with a tracer centered storage?
// This is what would feed into the vtable. Perhaps scan the map for the list of
// span IDs first? forms the "structured event storage" interface. So trace
// gives you a way to iterate over all the spans. Spans expose a way to get its
// recordings, which is coming back here. As for "consuming it", you can now
// iterate + consume it from here directly? Probably need another kind of
// iterator. But it avoids the hairy questions of consuming from spans, while
// still iterating, and what the lifecycle of the span looks like? Right now the
// verbose "message" recordings are part of the span itself. This is for just
// the structured events.
func (t *Tracer) getStructuredEvents(spanID uint64) []Structured {
t.activeStructuredEvents.Lock()
se := make([]Structured, 0, len(t.activeStructuredEvents.m[spanID]))
se = append(se, t.activeStructuredEvents.m[spanID]...)
t.activeStructuredEvents.Unlock()

return se
}

// ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span
// that "follows from" the original Span. This allows the resulting context to be
// used in an async task that might outlive the original operation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/tracingpb/recorded_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *RecordedSpan) String() string {
return sb.String()
}

// Structured visits the data passed to LogStructured for the Span from which
// Structured visits the data passed to RecordStructured for the Span from which
// the RecordedSpan was created.
func (s *RecordedSpan) Structured(visit func(*types.Any)) {
if s.DeprecatedStats != nil {
Expand Down