Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98831: sql: improve how tracing of processors is done on the gateway node r=yuzefovich a=yuzefovich

**tracing: make it possible for EventListeners to "consume" the event**

This commit extends `Notify` signature with a return argument in order
to introduce the notion of "event consumption". Once a listener
"consumes" the event, no other listener for the span nor the ancestor
spans will be notified about the event. This behavior is needed by the
follow-up commit which changes how DistSQL processors examine their
traces to pick out their "own" events from the events of their inputs.

This also resulted in notifications of the span's listeners be done
before the ancestor.

Release note: None

**execinfra: plumb FlowCtx and processorID into ProcessorSpan method**

This commit is mostly a mechanical change to plumb `FlowCtx` object as
well as the processor ID into `ProcessorSpan` method. The main goal of
this work is to be able to know whether the method is called on the
gateway node or not (this will be used by the follow-up commit), but
this change also results in unification of how FlowID and ProcessoID
tags are set on the tracing span.

Release note: None

**execstats: collect some stats via EventListeners**

This commit changes how contention time, scan stats, and consumed RU
information are accumulated by the processors. Previously, we would
analyze the trace recording of the processor to pick out all applicable
events from the trace and aggregate across all of them. This worked well
because currently we always create "detached" spans for all processors
(in other words, a contention event attached to the span of one
processor will not be seen by the parent processor).

However, the following commit will change that for processors on the
gateway. Namely, all processors on the gateway will no longer use the
detached option. As a result, we need to have a different way to
distinguish between events seen by the processor itself versus events
seen by the "child" processor. This commit introduces that way via the
EventListeners. In particular, it defines three types of listeners (for
all kinds of structured events we're interested in), which are then used
by all processors.

Release note: None

**sql: don't propagate spans from local processors as metadata**

This commit changes how we create tracing spans for components of the
flow on the gateway. Previously, we would always create them with the
detached option, and, as a result, we needed to collect the traces and
propagate them as metadata to be imported by the DistSQLReceiver into
the span of the flow on the gateway. However, this is not necessary to
do so on the gateway - we can just let the regular tracing behavior to
happen there.

The main benefit of this change is that we now preserve the hierarchy of
the tracing spans in the gateway flows making them much more
understandable when viewing via the jaeger. Perhaps this is also a minor
performance improvement since the native trace collection is probably
faster than having to import the trace data via the metadata, as was
done previously.

An additional improvement can be made to create the spans with the
detached recording only for the root components of the remote flows.
This is left as a TODO for now.

Epic: None

Release note: None

**tracing: Notify EventListeners while holding the span's mutex**

This commit addresses an oversight that was introduced in cockroachdb@6a3c0f9
where we made the notifications of the EventListeners done without
holding the span's mutex. However, the contract of `WithEventListeners`
implies that the callbacks are not called any more after `Finish()`
returns, so that commit introduced a bug that is now fixed in this
commit.

The fix done in this commit is suboptimal though - it introduces an
exported method `SetLazyTagLocked` which is not great. The proper fix
would be to refactor `TracingAggregator` to avoid setting the lazy tag
dynamically, in the event listener callback, but that seems like
a non-trivial change, and I'd like to get this commit backported. To
avoid introducing more callsites of this `SetLazyTagLocked` method it is
marked as "deprecated" and cockroachdb#100438 tracks addressing this piece of the
tech debt.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 3, 2023
2 parents 07024bc + c120153 commit cd933d3
Show file tree
Hide file tree
Showing 65 changed files with 613 additions and 414 deletions.
1 change: 1 addition & 0 deletions build/bazelutil/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@
"pkg/server/dumpstore/dumpstore.go$": "excluded until all uses of io/ioutil are replaced",
"pkg/server/dumpstore/dumpstore_test.go$": "excluded until all uses of io/ioutil are replaced",
"pkg/server/profiler/profilestore_test.go$": "excluded until all uses of io/ioutil are replaced",
"pkg/util/bulk/tracing_aggregator.go$":"temporary exclusion until #100438 is resolved",
"pkg/util/log/file_api.go$": "excluded until all uses of io/ioutil are replaced",
"pkg/build/bazel/bazel\\.go$": "Runfile function deprecated"
},
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/bulk/bulkpb/ingestion_performance_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,3 @@ type timing time.Duration

func (t timing) String() string { return time.Duration(t).Round(time.Second).String() }
func (t timing) SafeValue() {}

var _ bulk.TracingAggregatorEvent = &IngestionPerformanceStats{}
23 changes: 14 additions & 9 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ var indexBackfillMergeNumWorkers = settings.RegisterIntSetting(
// IndexBackfillMerger is a processor that merges entries from the corresponding
// temporary index to a new index.
type IndexBackfillMerger struct {
spec execinfrapb.IndexBackfillMergerSpec
processorID int32
spec execinfrapb.IndexBackfillMergerSpec

desc catalog.TableDescriptor

Expand Down Expand Up @@ -106,10 +107,10 @@ const indexBackfillMergeProgressReportInterval = 10 * time.Second
func (ibm *IndexBackfillMerger) Run(ctx context.Context, output execinfra.RowReceiver) {
opName := "IndexBackfillMerger"
ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID))
ctx, span := execinfra.ProcessorSpan(ctx, opName)
ctx, span := execinfra.ProcessorSpan(ctx, ibm.flowCtx, opName, ibm.processorID)
defer span.Finish()
defer output.ProducerDone()
defer execinfra.SendTraceData(ctx, output)
defer execinfra.SendTraceData(ctx, ibm.flowCtx, output)

mu := struct {
syncutil.Mutex
Expand Down Expand Up @@ -502,17 +503,21 @@ func (ibm *IndexBackfillMerger) Resume(output execinfra.RowReceiver) {

// NewIndexBackfillMerger creates a new IndexBackfillMerger.
func NewIndexBackfillMerger(
ctx context.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.IndexBackfillMergerSpec,
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.IndexBackfillMergerSpec,
) (*IndexBackfillMerger, error) {
mergerMon := execinfra.NewMonitor(ctx, flowCtx.Cfg.BackfillerMonitor,
"index-backfiller-merger-mon")

ibm := &IndexBackfillMerger{
spec: spec,
desc: tabledesc.NewUnsafeImmutable(&spec.Table),
flowCtx: flowCtx,
evalCtx: flowCtx.NewEvalCtx(),
mon: mergerMon,
processorID: processorID,
spec: spec,
desc: tabledesc.NewUnsafeImmutable(&spec.Table),
flowCtx: flowCtx,
evalCtx: flowCtx.NewEvalCtx(),
mon: mergerMon,
}

ibm.muBoundAccount.boundAccount = mergerMon.MakeBoundAccount()
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ func (r opResult) createDiskBackedSort(
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
es := colexecdisk.NewExternalSorter(
flowCtx,
processorID,
sortUnlimitedAllocator,
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
Expand Down Expand Up @@ -922,7 +924,7 @@ func NewColOperator(
if canUseDirectScan() {
scanOp, resultTypes, err = colfetcher.NewColBatchDirectScan(
ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1],
flowCtx, core.TableReader, post, args.TypeResolver,
flowCtx, spec.ProcessorID, core.TableReader, post, args.TypeResolver,
)
if err != nil {
return r, err
Expand All @@ -932,7 +934,7 @@ func NewColOperator(
if scanOp == nil {
scanOp, resultTypes, err = colfetcher.NewColBatchScan(
ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1],
flowCtx, core.TableReader, post, estimatedRowCount, args.TypeResolver,
flowCtx, spec.ProcessorID, core.TableReader, post, estimatedRowCount, args.TypeResolver,
)
if err != nil {
return r, err
Expand Down Expand Up @@ -965,7 +967,7 @@ func NewColOperator(
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, getStreamingAllocator(ctx, args),
colmem.NewAllocator(ctx, accounts[0], factory),
accounts[1], accounts[2], flowCtx,
accounts[1], accounts[2], flowCtx, spec.ProcessorID,
inputs[0].Root, core.JoinReader, post, inputTypes,
streamerDiskMonitor, args.TypeResolver,
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(testAllocator, parallelUnorderedSynchronizerInputs, &wg)
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1)
})
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type externalSorter struct {
colexecop.InitHelper
colexecop.NonExplainable
colexecop.CloserHelper
flowCtx *execinfra.FlowCtx
processorID int32

// mergeUnlimitedAllocator is used to track the memory under the batches
// dequeued from partitions during the merge operation.
Expand Down Expand Up @@ -220,6 +222,8 @@ var _ colexecop.ClosableOperator = &externalSorter{}
// the partitioned disk queue acquire file descriptors instead of acquiring
// them up front in Next. This should only be true in tests.
func NewExternalSorter(
flowCtx *execinfra.FlowCtx,
processorID int32,
sortUnlimitedAllocator *colmem.Allocator,
mergeUnlimitedAllocator *colmem.Allocator,
outputUnlimitedAllocator *colmem.Allocator,
Expand Down Expand Up @@ -298,6 +302,8 @@ func NewExternalSorter(
}
es := &externalSorter{
OneInputNode: colexecop.NewOneInputNode(inMemSorter),
flowCtx: flowCtx,
processorID: processorID,
mergeUnlimitedAllocator: mergeUnlimitedAllocator,
outputUnlimitedAllocator: outputUnlimitedAllocator,
mergeMemoryLimit: mergeMemoryLimit,
Expand Down Expand Up @@ -711,8 +717,8 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr
outputBatchMemSize = minOutputBatchMemSize
}
return colexec.NewOrderedSynchronizer(
s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs,
s.inputTypes, s.columnOrdering, tuplesToMerge,
s.flowCtx, s.processorID, s.outputUnlimitedAllocator, outputBatchMemSize,
syncInputs, s.inputTypes, s.columnOrdering, tuplesToMerge,
)
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/sql/colexec/ordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestOrderedSync(t *testing.T) {
typs[i] = types.Int
}
colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) {
return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil
return NewOrderedSynchronizer(&execinfra.FlowCtx{Gateway: true}, 0 /* processorID */, testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil
})
}
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestOrderedSyncRandomInput(t *testing.T) {
inputs[i].Root = colexectestutils.NewOpTestInput(testAllocator, batchSize, sources[i], typs)
}
ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op := NewOrderedSynchronizer(&execinfra.FlowCtx{Gateway: true}, 0 /* processorID */, testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op.Init(context.Background())
out := colexectestutils.NewOpTestOutput(op, expected)
if err := out.Verify(); err != nil {
Expand Down Expand Up @@ -218,7 +218,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) {
}

ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op := NewOrderedSynchronizer(&execinfra.FlowCtx{Gateway: true}, 0 /* processorID */, testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op.Init(ctx)

b.SetBytes(8 * int64(coldata.BatchSize()) * numInputs)
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ const _TYPE_WIDTH = 0
// stream are assumed to be ordered according to the same set of columns.
type OrderedSynchronizer struct {
colexecop.InitHelper
span *tracing.Span
flowCtx *execinfra.FlowCtx
processorID int32
span *tracing.Span

accountingHelper colmem.SetAccountingHelper
inputs []colexecargs.OpWithMetaInfo
Expand Down Expand Up @@ -107,6 +109,8 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode {
// - tuplesToMerge, if positive, indicates the total number of tuples that will
// be emitted by all inputs, use 0 if unknown.
func NewOrderedSynchronizer(
flowCtx *execinfra.FlowCtx,
processorID int32,
allocator *colmem.Allocator,
memoryLimit int64,
inputs []colexecargs.OpWithMetaInfo,
Expand All @@ -115,6 +119,8 @@ func NewOrderedSynchronizer(
tuplesToMerge int64,
) *OrderedSynchronizer {
os := &OrderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
inputs: inputs,
ordering: ordering,
typs: typs,
Expand Down Expand Up @@ -232,7 +238,7 @@ func (o *OrderedSynchronizer) Init(ctx context.Context) {
if !o.InitHelper.Init(ctx) {
return
}
o.Ctx, o.span = execinfra.ProcessorSpan(o.Ctx, "ordered sync")
o.Ctx, o.span = execinfra.ProcessorSpan(o.Ctx, o.flowCtx, "ordered sync", o.processorID)
o.inputIndices = make([]int, len(o.inputs))
for i := range o.inputs {
o.inputs[i].Root.Init(o.Ctx)
Expand All @@ -252,7 +258,7 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata {
o.span.RecordStructured(stats.GetStats())
}
}
if meta := execinfra.GetTraceDataAsMetadata(o.span); meta != nil {
if meta := execinfra.GetTraceDataAsMetadata(o.flowCtx, o.span); meta != nil {
bufferedMeta = append(bufferedMeta, *meta)
}
}
Expand Down
35 changes: 21 additions & 14 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ const (
type ParallelUnorderedSynchronizer struct {
colexecop.InitHelper

allocator *colmem.Allocator
inputs []colexecargs.OpWithMetaInfo
inputCtxs []context.Context
flowCtx *execinfra.FlowCtx
processorID int32
allocator *colmem.Allocator
inputs []colexecargs.OpWithMetaInfo
inputCtxs []context.Context
// cancelLocalInput stores context cancellation functions for each of the
// inputs. The functions are populated only if LocalPlan is true.
// inputs. The functions are populated only if localPlan is true.
cancelLocalInput []context.CancelFunc
// LocalPlan indicates whether this synchronizer is a part of the fully
// local plan.
LocalPlan bool
tracingSpans []*tracing.Span
tracingSpans []*tracing.Span
// readNextBatch is a slice of channels, where each channel corresponds to the
// input at the same index in inputs. It is used as a barrier for input
// goroutines to wait on until the Next goroutine signals that it is safe to
Expand Down Expand Up @@ -134,7 +133,11 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.
// zero-length batch received from Next.
// - allocator must use a memory account that is not shared with any other user.
func NewParallelUnorderedSynchronizer(
allocator *colmem.Allocator, inputs []colexecargs.OpWithMetaInfo, wg *sync.WaitGroup,
flowCtx *execinfra.FlowCtx,
processorID int32,
allocator *colmem.Allocator,
inputs []colexecargs.OpWithMetaInfo,
wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
readNextBatch := make([]chan struct{}, len(inputs))
for i := range readNextBatch {
Expand All @@ -143,6 +146,8 @@ func NewParallelUnorderedSynchronizer(
readNextBatch[i] = make(chan struct{}, 1)
}
return &ParallelUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
allocator: allocator,
inputs: inputs,
inputCtxs: make([]context.Context, len(inputs)),
Expand Down Expand Up @@ -172,13 +177,15 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) {
return
}
for i, input := range s.inputs {
s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(s.Ctx, fmt.Sprintf("parallel unordered sync input %d", i))
if s.LocalPlan {
// If there plan is local, there are no colrpc.Inboxes in this input
s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID,
)
if s.flowCtx.Local {
// If the plan is local, there are no colrpc.Inboxes in this input
// tree, and the synchronizer can cancel the current work eagerly
// when transitioning into draining.
//
// If there plan is distributed, there might be an inbox in the
// If the plan is distributed, there might be an inbox in the
// input tree, and the synchronizer cannot cancel the work eagerly
// because canceling the context would break the gRPC stream and
// make it impossible to fetch the remote metadata. Furthermore, it
Expand Down Expand Up @@ -287,7 +294,7 @@ func (s *ParallelUnorderedSynchronizer) init() {
for _, s := range input.StatsCollectors {
span.RecordStructured(s.GetStats())
}
if meta := execinfra.GetTraceDataAsMetadata(span); meta != nil {
if meta := execinfra.GetTraceDataAsMetadata(s.flowCtx, span); meta != nil {
msg.meta = append(msg.meta, *meta)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -127,8 +128,7 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg)
s.LocalPlan = true
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, inputs, &wg)
s.Init(ctx)

t.Run(fmt.Sprintf("numInputs=%d/numBatches=%d/terminationScenario=%d", numInputs, numBatches, terminationScenario), func(t *testing.T) {
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
}

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, inputs, &wg)
s.Init(ctx)
for {
if err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next() }); err != nil {
Expand Down Expand Up @@ -261,7 +261,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, inputs, &wg)
s.Init(ctx)
b.SetBytes(8 * int64(coldata.BatchSize()))
b.ResetTimer()
Expand Down
Loading

0 comments on commit cd933d3

Please sign in to comment.