Skip to content

Commit

Permalink
tracing: untangle local/remove parent vs recording collection
Browse files Browse the repository at this point in the history
Before this patch, two concepts were bundled together: whether or not
the parent of a span is local or remote to the child's node, and whether
or not the parent should include the child in its recording. You could
create a child span with either:
- the WithParentAndAutoCollection option, which would give you a local
  parent which includes the child in its recording, or
- the WithParentAndManualCollection option, which would give you a
  remote parent (which naturally would not / couldn't include the child
  in its recording).

WithParentAndManualCollection is sometimes used even when the parent
is local, which is a) quite confusing, because at a lower level it
produced what was called a "remote parent" and b) sub-optimal, because
the child looked like a "local root", which meant it had to be put into
the active spans registry. Were it not for the bundling of the two
concerns together, such a child of a local parent, but for whom the
parent is not in charge of collecting the recording, would not need to
be put directly into the span registry; such a span (like other child
spans) should be part of the registry indirectly, through its parent -
which is cheaper because it doesn't add contention on the global mutex
lock.

The funky case of a local parent whose recording does not include the
child's is used by DistSQL processors - each processor gets a span whose
recording is collected by the DistSQL infrastructure, independent of the
parent.

This patch untangles the parent-child relationship from the
recording collection concern, and cleans up the API in the process. Now
you have the following options for creating a span:

WithParent(parent) - creates a parent/child relationship. The parent has
a reference to the child. The child is part of the active spans registry
through the parent. Unless WithDetachedRecording() is also specified,
the parent's recording includes the child's.

WithRemoteParent(parentMeta) - creates a parent/child relationship
across process boundaries.

WithDetachedRecording() - asks the parent to not include the new child
in its recording. This is a no-op in case WithRemoteParent() is used.
This option is used by DistSQL.

So, we get a cleaner API, and a more efficient active spans registry
implementation because the DistSQL spans are no longer directly part of
it. We also get a new feature: given a span id, the trace registry is
now able to collect the recording of all the children of that span,
including the ones created with WithDetachedRecording. This is nice,
since it'll make inspection of DistSQL traces easier (since they'll look
more like regular traces, rather then a collection of unrelated root
spans that all need to be looked at in isolation).

WithDetachedRecording() is used in two cases (nothing really new here):
- for DistSQL processors, like discussed
- for async tasks which (may) outlive the parent. In particular, the
stopper uses it for its FollowsFromSpan task option. This use of
detached recording is debatable. It's not necessary for correctness,
since there's no particular harm in having the async task be a regular
child. It arguably produces better looking recordings, since they don't
include info on the async task. It can also serve as an optimization
(although currently it doesn't) - if we know that the async span is
likely to outlive the parent, then we might be better off inserting it
directly into the registry, as opposed to first inserting it into the
parent, and only moving it to the registry when the parent Finish()es.

Release note: None
  • Loading branch information
andreimatei committed Nov 18, 2021
1 parent ac12ca2 commit 8952cc8
Show file tree
Hide file tree
Showing 24 changed files with 304 additions and 221 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
cmd.ctx, cmd.sp = d.r.AmbientContext.Tracer.StartSpanCtx(
ctx,
opName,
// NB: we are lying here - we are not actually going to propagate
// the recording towards the root. That seems ok.
tracing.WithParentAndManualCollection(spanMeta),
// NB: Nobody is collecting the recording of this span; we have no
// mechanism for it.
tracing.WithRemoteParent(spanMeta),
tracing.WithFollowsFrom(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
ctx, sp = tr.StartSpanCtx(
ctx,
opName,
tracing.WithParentAndAutoCollection(parentSp),
tracing.WithParent(parentSp),
tracing.WithFollowsFrom(),
tagsOpt,
)
Expand Down
16 changes: 7 additions & 9 deletions pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,13 @@ RETURNING id;`).Scan(&secondID))
}()

testutils.SucceedsSoon(t, func() error {
// TODO(yuzefovich): this check is quite unfortunate since it relies on
// the assumption that all recordings from the child spans are imported
// into the tracer. However, this is not the case for the DistSQL
// processors where child spans are created with
// WithParentAndManualCollection option which requires explicitly
// importing the recordings from the children. This only happens when
// the execution flow is drained which cannot happen until we close
// the 'unblock' channel, and this we cannot do until we see the
// expected message in the trace.
// TODO(yuzefovich): this check is quite unfortunate since it relies on the
// assumption that all recordings from the child spans are imported into the
// tracer. However, this is not the case for the DistSQL processors whose
// recordings require explicit importing. This only happens when the
// execution flow is drained which cannot happen until we close the
// 'unblock' channel, and this we cannot do until we see the expected
// message in the trace.
//
// At the moment it works in a very fragile manner (by making sure that
// no processors actually create their own spans). Instead, a different
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ CREATE TABLE crdb_internal.node_inflight_trace_spans (
"only users with the admin role are allowed to read crdb_internal.node_inflight_trace_spans")
}
return p.ExecCfg().AmbientCtx.Tracer.VisitSpans(func(span tracing.RegistrySpan) error {
for _, rec := range span.GetRecording(tracing.RecordingVerbose) {
for _, rec := range span.GetFullRecording(tracing.RecordingVerbose) {
traceID := rec.TraceID
parentSpanID := rec.ParentSpanID
spanID := rec.SpanID
Expand Down
33 changes: 16 additions & 17 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,13 +700,13 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
//
// Traces on node1:
// -------------
// root <-- traceID1
// root.child <-- traceID1
// root.child.remotechild <-- traceID1
// root <-- traceID1
// root.child <-- traceID1
// root.child.detached_child <-- traceID1
//
// Traces on node2:
// -------------
// root.child.remotechild2 <-- traceID1
// root.child.remotechild <-- traceID1
// root.child.remotechilddone <-- traceID1
// root2 <-- traceID2
// root2.child <-- traceID2
Expand All @@ -717,33 +717,33 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, func()) {
time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := t1.StartSpan("root.child", tracing.WithParent(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a forked child span on "node 1".
childRemoteChild := t1.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))
childDetachedChild := t1.StartSpan("root.child.detached_child", tracing.WithParent(child), tracing.WithDetachedRecording())

// Start a remote child span on "node 2".
childRemoteChild2 := t2.StartSpan("root.child.remotechild2", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithRemoteParent(child.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParent(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
root2 := t2.StartSpan("root2", tracing.WithRecording(tracing.RecordingVerbose))

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
child2 := t2.StartSpan("root2.child", tracing.WithParent(root2))
return root.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild,
childRemoteChild2, root2, child2} {
for _, span := range []*tracing.Span{root, child, childDetachedChild,
childRemoteChild, root2, child2} {
span.Finish()
}
}
Expand All @@ -766,13 +766,16 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
traceID, cleanup := setupTraces(node1Tracer, node2Tracer)
defer cleanup()

// The cluster_inflight_traces table is magic and only returns results when
// the query contains an index constraint.

t.Run("no-index-constraint", func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * from crdb_internal.cluster_inflight_traces`, [][]string{})
})

t.Run("with-index-constraint", func(t *testing.T) {
// We expect there to be 3 tracing.Recordings rooted at
// root, root.child.remotechild, root.child.remotechild2.
// root and root.child.remotechild.
expectedRows := []struct {
traceID int
nodeID int
Expand All @@ -781,10 +784,6 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 2,
Expand All @@ -799,8 +798,8 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
require.NoError(t, rows.Scan(&traceID, &nodeID, &traceStr, &jaegarJSON))
require.Less(t, rowIdx, len(expectedRows))
expected := expectedRows[rowIdx]
require.Equal(t, expected.nodeID, nodeID)
require.Equal(t, expected.traceID, traceID)
require.Equal(t, expected.nodeID, nodeID)
require.NotEmpty(t, traceStr)
require.NotEmpty(t, jaegarJSON)
rowIdx++
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func (ds *ServerImpl) setupFlow(
// TODO(andrei): localState.IsLocal is not quite the right thing to use.
// If that field is unset, we might still want to create a child span if
// this flow is run synchronously.
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName, tracing.WithParentAndAutoCollection(parentSpan))
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName, tracing.WithParent(parentSpan))
} else {
// We use FollowsFrom because the flow's span outlives the SetupFlow request.
ctx, sp = ds.Tracer.StartSpanCtx(
ctx,
opName,
tracing.WithParentAndAutoCollection(parentSpan),
tracing.WithParent(parentSpan),
tracing.WithFollowsFrom(),
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,12 @@ func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {
// ProcessorSpan creates a child span for a processor (if we are doing any
// tracing). The returned span needs to be finished using tracing.FinishSpan.
func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return tracing.ChildSpanRemote(ctx, name)
sp := tracing.SpanFromContext(ctx)
if sp == nil {
return ctx, nil
}
return sp.Tracer().StartSpanCtx(ctx, name,
tracing.WithParent(sp), tracing.WithDetachedRecording())
}

// StartInternal prepares the ProcessorBase for execution. It returns the
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1846,10 +1846,10 @@ func (p *payloadsForSpanGenerator) Next(_ context.Context) (bool, error) {
for p.payloads == nil {
p.recordingIndex++
// If there are no more recordings, then we cannot continue.
if !(p.recordingIndex < p.span.GetRecording(tracing.RecordingVerbose).Len()) {
if !(p.recordingIndex < p.span.GetFullRecording(tracing.RecordingVerbose).Len()) {
return false, nil
}
currRecording := p.span.GetRecording(tracing.RecordingVerbose)[p.recordingIndex]
currRecording := p.span.GetFullRecording(tracing.RecordingVerbose)[p.recordingIndex]
currRecording.Structured(func(item *pbtypes.Any, _ time.Time) {
payload, err := protoreflect.MessageToJSON(item, protoreflect.FmtFlags{EmitDefaults: true})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/tests/tracing_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func TestSetTraceSpansVerbosityBuiltin(t *testing.T) {
defer root.Finish()
require.False(t, root.IsVerbose())

child := tr.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := tr.StartSpan("root.child", tracing.WithParent(root))
defer child.Finish()
require.False(t, child.IsVerbose())

childChild := tr.StartSpan("root.child.child", tracing.WithParentAndAutoCollection(child))
childChild := tr.StartSpan("root.child.child", tracing.WithParent(child))
defer childChild.Finish()
require.False(t, childChild.IsVerbose())

Expand Down Expand Up @@ -79,7 +79,7 @@ func TestSetTraceSpansVerbosityBuiltin(t *testing.T) {
require.False(t, childChild.IsVerbose())

// New child of verbose child span should also be verbose by default.
newChild := tr.StartSpan("root.child.newchild", tracing.WithParentAndAutoCollection(root))
newChild := tr.StartSpan("root.child.newchild", tracing.WithParent(root))
defer newChild.Finish()
require.True(t, newChild.IsVerbose())

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (tc *TestCluster) stopServers(ctx context.Context) {
var buf strings.Builder
fmt.Fprintf(&buf, "unexpectedly found %d active spans:\n", len(sps))
for _, sp := range sps {
fmt.Fprintln(&buf, sp.GetRecording(tracing.RecordingVerbose))
fmt.Fprintln(&buf, sp.GetFullRecording(tracing.RecordingVerbose))
fmt.Fprintln(&buf)
}
return errors.Newf("%s", buf.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/stop/stopper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func TestStopperRunAsyncTaskTracing(t *testing.T) {
errC <- errors.Errorf("missing span")
return
}
sp = tr.StartSpan("child", tracing.WithParentAndAutoCollection(sp))
sp = tr.StartSpan("child", tracing.WithParent(sp))
if sp.TraceID() == traceID {
errC <- errors.Errorf("expected different trace")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/tracing/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
WithForceRealSpan(), WithLogTags(&staticLogTags),
}},
{"real,autoparent", []SpanOption{
WithForceRealSpan(), WithParentAndAutoCollection(parSp),
WithForceRealSpan(), WithParent(parSp),
}},
{"real,manualparent", []SpanOption{
WithForceRealSpan(), WithParentAndManualCollection(parSp.Meta()),
WithForceRealSpan(), WithParent(parSp), WithDetachedRecording(),
}},
} {
b.Run(fmt.Sprintf("opts=%s", tc.name), func(b *testing.B) {
Expand Down Expand Up @@ -91,7 +91,7 @@ func BenchmarkSpan_GetRecording(b *testing.B) {
run(b, sp)
})

child := tr.StartSpan("bar", WithParentAndAutoCollection(sp), WithForceRealSpan())
child := tr.StartSpan("bar", WithParent(sp), WithForceRealSpan())
b.Run("child-only", func(b *testing.B) {
run(b, child)
})
Expand All @@ -110,7 +110,7 @@ func BenchmarkRecordingWithStructuredEvent(b *testing.B) {
for i := 0; i < b.N; i++ {
root := tr.StartSpan("foo")
root.RecordStructured(ev)
child := tr.StartSpan("bar", WithParentAndAutoCollection(root))
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.GetRecording(RecordingStructured)
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/tracing/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,35 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, tracingpb.TraceID,
time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := t1.StartSpan("root.child", tracing.WithParent(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a remote child span on "node 2".
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithRemoteParent(child.Meta()))
childRemoteChild.RecordStructured(newTestStructured("root.child.remotechild"))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParent(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start a root span on "node 2".
root2 := t2.StartSpan("root2", tracing.WithRecording(tracing.RecordingVerbose))
root2.RecordStructured(newTestStructured("root2"))

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
child2 := t2.StartSpan("root2.child", tracing.WithParent(root2))
// Start a remote child span on "node 1".
child2RemoteChild := t1.StartSpan("root2.child.remotechild", tracing.WithParentAndManualCollection(child2.Meta()))
child2RemoteChild := t1.StartSpan("root2.child.remotechild", tracing.WithRemoteParent(child2.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 1".
anotherChild2RemoteChild := t1.StartSpan("root2.child.remotechild2", tracing.WithParentAndManualCollection(child2.Meta()))
anotherChild2RemoteChild := t1.StartSpan("root2.child.remotechild2", tracing.WithRemoteParent(child2.Meta()))
return root.TraceID(), root2.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild, root2, child2,
child2RemoteChild, anotherChild2RemoteChild} {
Expand Down
Loading

0 comments on commit 8952cc8

Please sign in to comment.