Skip to content

Commit

Permalink
sql: don't propagate spans from local processors as metadata
Browse files Browse the repository at this point in the history
This commit changes how we create tracing spans for components of the
flow on the gateway. Previously, we would always create them with the
detached option, and, as a result, we needed to collect the traces and
propagate them as metadata to be imported by the DistSQLReceiver into
the span of the flow on the gateway. However, this is not necessary to
do so on the gateway - we can just let the regular tracing behavior to
happen there.

The main benefit of this change is that we now preserve the hierarchy of
the tracing spans in the gateway flows making them much more
understandable when viewing via the jaeger. Perhaps this is also a minor
performance improvement since the native trace collection is probably
faster than having to import the trace data via the metadata, as was
done previously.

An additional improvement can be made to create the spans with the
detached recording only for the root components of the remote flows.
This is left as a TODO for now.

Epic: None

Release note: None
  • Loading branch information
yuzefovich committed Apr 3, 2023
1 parent 0308728 commit 5a940f6
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 37 deletions.
6 changes: 4 additions & 2 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ func (s *colBatchScanBase) drainMeta() []execinfrapb.ProducerMetadata {
if tfs := execinfra.GetLeafTxnFinalState(s.Ctx, s.flowCtx.Txn); tfs != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs})
}
if trace := tracing.SpanFromContext(s.Ctx).GetConfiguredRecording(); trace != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
if !s.flowCtx.Gateway {
if trace := tracing.SpanFromContext(s.Ctx).GetConfiguredRecording(); trace != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
}
}
return trailingMeta
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,10 @@ func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata {
meta.Metrics.BytesRead = s.GetBytesRead()
meta.Metrics.RowsRead = s.GetRowsRead()
trailingMeta = append(trailingMeta, *meta)
if trace := tracing.SpanFromContext(s.Ctx).GetConfiguredRecording(); trace != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
if !s.flowCtx.Gateway {
if trace := tracing.SpanFromContext(s.Ctx).GetConfiguredRecording(); trace != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
}
}
return trailingMeta
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,16 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
}
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.RemoteProducerMetadata{
Value: &execinfrapb.RemoteProducerMetadata_TraceData_{
TraceData: &execinfrapb.RemoteProducerMetadata_TraceData{
CollectedSpans: trace,
if !o.flowCtx.Gateway {
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.RemoteProducerMetadata{
Value: &execinfrapb.RemoteProducerMetadata_TraceData_{
TraceData: &execinfrapb.RemoteProducerMetadata_TraceData{
CollectedSpans: trace,
},
},
},
})
})
}
}
if len(msg.Data.Metadata) == 0 {
return nil
Expand Down
17 changes: 12 additions & 5 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,11 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver
}

// GetTraceDataAsMetadata returns the trace data as execinfrapb.ProducerMetadata
// object.
func GetTraceDataAsMetadata(_ *FlowCtx, span *tracing.Span) *execinfrapb.ProducerMetadata {
// object when called not on the gateway.
func GetTraceDataAsMetadata(flowCtx *FlowCtx, span *tracing.Span) *execinfrapb.ProducerMetadata {
if flowCtx.Gateway {
return nil
}
if trace := span.GetConfiguredRecording(); len(trace) > 0 {
meta := execinfrapb.GetProducerMeta()
meta.TraceData = trace
Expand All @@ -264,11 +267,15 @@ func GetTraceDataAsMetadata(_ *FlowCtx, span *tracing.Span) *execinfrapb.Produce
}

// SendTraceData collects the tracing information from the ctx and pushes it to
// dst. The ConsumerStatus returned by dst is ignored.
// dst when called not on the gateway. The ConsumerStatus returned by dst is
// ignored.
//
// Note that the tracing data is distinct between different processors, since
// each one gets its own "detached" tracing span.
func SendTraceData(ctx context.Context, _ *FlowCtx, dst RowReceiver) {
// each one gets its own "detached" tracing span (when not on the gateway).
func SendTraceData(ctx context.Context, flowCtx *FlowCtx, dst RowReceiver) {
if flowCtx.Gateway {
return
}
if rec := tracing.SpanFromContext(ctx).GetConfiguredRecording(); rec != nil {
dst.Push(nil /* row */, &execinfrapb.ProducerMetadata{TraceData: rec})
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,15 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() {
pb.span.RecordStructured(stats)
}
}
if trace := pb.span.GetConfiguredRecording(); trace != nil {
pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
// Note that we need to propagate the trace only from the remote nodes
// because there we create spans with the detached option (see
// ProcessorSpan). If we're on the gateway, then the recording of this
// span is already included into the parent, thus, we don't generate the
// metadata for it.
if !pb.FlowCtx.Gateway {
if trace := pb.span.GetConfiguredRecording(); trace != nil {
pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
}
}
}

Expand Down Expand Up @@ -834,9 +841,22 @@ func ProcessorSpan(
if len(eventListeners) > 0 {
listenersOpt = tracing.WithEventListeners(eventListeners...)
}
retCtx, retSpan := sp.Tracer().StartSpanCtx(
ctx, name, tracing.WithParent(sp), tracing.WithDetachedRecording(), listenersOpt,
)
var retCtx context.Context
var retSpan *tracing.Span
if flowCtx.Gateway {
retCtx, retSpan = sp.Tracer().StartSpanCtx(
ctx, name, tracing.WithParent(sp), listenersOpt,
)
} else {
// The trace from each processor will be imported into the span of the
// flow on the gateway, in DistSQLReceiver.pushMeta, so we use the
// detached option.
// TODO(yuzefovich): only use the detached recording for the root
// components of the remote flows.
retCtx, retSpan = sp.Tracer().StartSpanCtx(
ctx, name, tracing.WithParent(sp), tracing.WithDetachedRecording(), listenersOpt,
)
}
if retSpan.IsVerbose() {
retSpan.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(flowCtx.ID.String()))
retSpan.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(processorID)))
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ type ProducerMetadata struct {
Ranges []roachpb.RangeInfo
// TODO(vivek): change to type Error
Err error
// TraceData is sent if tracing is enabled.
// TraceData is sent if tracing is enabled, by all processors on the remote
// nodes.
TraceData []tracingpb.RecordedSpan
// LeafTxnFinalState contains the final state of the LeafTxn to be
// sent from leaf flows to the RootTxn held by the flow's ultimate
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,12 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx)))
}
span.RecordStructured(&m.stats)
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})
if err != nil {
return err
if !m.flowCtx.Gateway {
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})
if err != nil {
return err
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/inverted_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func TestInvertedJoinerDrain(t *testing.T) {
TempStorage: tempEngine,
},
Txn: leafTxn,
Gateway: false,
DiskMonitor: diskMonitor,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,7 @@ func TestJoinReaderDrain(t *testing.T) {
TempStorage: tempEngine,
},
Txn: leafTxn,
Gateway: false,
DiskMonitor: diskMonitor,
}

Expand Down
17 changes: 6 additions & 11 deletions pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ func TestTableReaderDrain(t *testing.T) {
Cfg: &execinfra.ServerConfig{
Settings: st,
},
Txn: leafTxn,
Local: true,
NodeID: evalCtx.NodeID,
Txn: leafTxn,
Gateway: false,
NodeID: evalCtx.NodeID,
}
spec := execinfrapb.TableReaderSpec{
Spans: []roachpb.Span{td.PrimaryIndexSpan(keys.SystemSQLCodec)},
Expand Down Expand Up @@ -393,8 +393,9 @@ func TestLimitScans(t *testing.T) {
func() int64 { return 2 << 10 }, s.Stopper(),
),
},
Txn: kv.NewTxn(ctx, kvDB, s.NodeID()),
NodeID: evalCtx.NodeID,
Txn: kv.NewTxn(ctx, kvDB, s.NodeID()),
NodeID: evalCtx.NodeID,
Gateway: true,
}
spec := execinfrapb.TableReaderSpec{
FetchSpec: makeFetchSpec(t, tableDesc, "t_pkey", ""),
Expand Down Expand Up @@ -422,12 +423,6 @@ func TestLimitScans(t *testing.T) {
if row != nil {
rows++
}

// Simulate what the DistSQLReceiver does and ingest the trace.
if meta != nil && len(meta.TraceData) > 0 {
sp.ImportRemoteRecording(meta.TraceData)
}

if row == nil && meta == nil {
break
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/zigzagjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func TestZigzagJoinerDrain(t *testing.T) {
Mon: evalCtx.TestingMon,
Cfg: &execinfra.ServerConfig{Settings: s.ClusterSettings()},
Txn: leafTxn,
Gateway: false,
}

testReaderProcessorDrain(ctx, t, func() (execinfra.Processor, error) {
Expand Down

0 comments on commit 5a940f6

Please sign in to comment.