Skip to content

Commit

Permalink
execstats: collect some stats via EventListeners
Browse files Browse the repository at this point in the history
This commit changes how contention time, scan stats, and consumed RU
information are accumulated by the processors. Previously, we would
analyze the trace recording of the processor to pick out all applicable
events from the trace and aggregate across all of them. This worked well
because currently we always create "detached" spans for all processors
(in other words, a contention event attached to the span of one
processor will not be seen by the parent processor).

However, the following commit will change that for processors on the
gateway. Namely, all processors on the gateway will no longer use the
detached option. As a result, we need to have a different way to
distinguish between events seen by the processor itself versus events
seen by the "child" processor. This commit introduces that way via the
EventListeners. In particular, it defines three types of listeners (for
all kinds of structured events we're interested in), which are then used
by all processors.

Release note: None
  • Loading branch information
yuzefovich committed Apr 3, 2023
1 parent 7b0cfc0 commit 0308728
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 192 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type KVReader interface {
// GetScanStats returns statistics about the scan that happened during the
// KV reads. It must be safe for concurrent use.
GetScanStats() execstats.ScanStats
// GetConsumedRU returns the number of RUs that were consumed during the
// KV reads.
GetConsumedRU() uint64
// GetKVCPUTime returns the CPU time consumed *on the current goroutine* by
// KV requests. It must be safe for concurrent use. It is used to calculate
// the SQL CPU time.
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func (s *ColBatchDirectScan) Init(ctx context.Context) {
if !s.InitHelper.Init(ctx) {
return
}
// If tracing is enabled, we need to start a child span so that the only
// contention events present in the recording would be because of this
// fetcher. Note that ProcessorSpan method itself will check whether tracing
// is enabled.
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, s.flowCtx, "colbatchdirectscan", s.processorID)
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, "colbatchdirectscan", s.processorID,
&s.contentionEventsListener, &s.scanStatsListener, &s.tenantConsumptionListener,
)
firstBatchLimit := cFetcherFirstBatchLimit(s.limitHint, s.spec.MaxKeysPerRow)
err := s.fetcher.SetupNextFetch(
ctx, s.Spans, nil /* spanIDs */, s.batchBytesLimit, firstBatchLimit, false, /* spansCanOverlap */
Expand Down Expand Up @@ -147,6 +146,9 @@ func (s *ColBatchDirectScan) GetBatchRequestsIssued() int64 {
return s.fetcher.GetBatchRequestsIssued()
}

// TODO(yuzefovich): check whether GetScanStats and GetConsumedRU should be
// reimplemented.

// GetKVCPUTime is part of the colexecop.KVReader interface.
//
// Note that this KV CPU time, unlike for the ColBatchScan, includes the
Expand Down
25 changes: 16 additions & 9 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ type colBatchScanBase struct {
parallelize bool
// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
tracingSpan *tracing.Span
mu struct {
tracingSpan *tracing.Span
contentionEventsListener execstats.ContentionEventsListener
scanStatsListener execstats.ScanStatsListener
tenantConsumptionListener execstats.TenantConsumptionListener
mu struct {
syncutil.Mutex
// rowsRead contains the number of total rows this ColBatchScan has
// returned so far.
Expand Down Expand Up @@ -86,12 +89,17 @@ func (s *colBatchScanBase) GetRowsRead() int64 {

// GetContentionTime is part of the colexecop.KVReader interface.
func (s *colBatchScanBase) GetContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
return s.contentionEventsListener.CumulativeContentionTime
}

// GetScanStats is part of the colexecop.KVReader interface.
func (s *colBatchScanBase) GetScanStats() execstats.ScanStats {
return execstats.GetScanStats(s.Ctx, nil /* recording */)
return s.scanStatsListener.ScanStats
}

// GetConsumedRU is part of the colexecop.KVReader interface.
func (s *colBatchScanBase) GetConsumedRU() uint64 {
return s.tenantConsumptionListener.ConsumedRU
}

// Release implements the execreleasable.Releasable interface.
Expand Down Expand Up @@ -218,11 +226,10 @@ func (s *ColBatchScan) Init(ctx context.Context) {
if !s.InitHelper.Init(ctx) {
return
}
// If tracing is enabled, we need to start a child span so that the only
// contention events present in the recording would be because of this
// fetcher. Note that ProcessorSpan method itself will check whether tracing
// is enabled.
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, s.flowCtx, "colbatchscan", s.processorID)
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, "colbatchscan", s.processorID,
&s.contentionEventsListener, &s.scanStatsListener, &s.tenantConsumptionListener,
)
limitBatches := !s.parallelize
if err := s.cf.StartScan(
s.Ctx,
Expand Down
33 changes: 20 additions & 13 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ type ColIndexJoin struct {

// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
tracingSpan *tracing.Span
mu struct {
tracingSpan *tracing.Span
contentionEventsListener execstats.ContentionEventsListener
scanStatsListener execstats.ScanStatsListener
tenantConsumptionListener execstats.TenantConsumptionListener
mu struct {
syncutil.Mutex
// rowsRead contains the number of total rows this ColIndexJoin has
// returned so far.
Expand All @@ -137,11 +140,10 @@ func (s *ColIndexJoin) Init(ctx context.Context) {
if !s.InitHelper.Init(ctx) {
return
}
// If tracing is enabled, we need to start a child span so that the only
// contention events present in the recording would be because of this
// fetcher. Note that ProcessorSpan method itself will check whether tracing
// is enabled.
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, s.flowCtx, "colindexjoin", s.processorID)
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, "colindexjoin", s.processorID,
&s.contentionEventsListener, &s.scanStatsListener, &s.tenantConsumptionListener,
)
s.Input.Init(s.Ctx)
}

Expand Down Expand Up @@ -408,7 +410,17 @@ func (s *ColIndexJoin) GetKVCPUTime() time.Duration {

// GetContentionTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
return s.contentionEventsListener.CumulativeContentionTime
}

// GetScanStats is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetScanStats() execstats.ScanStats {
return s.scanStatsListener.ScanStats
}

// GetConsumedRU is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetConsumedRU() uint64 {
return s.tenantConsumptionListener.ConsumedRU
}

// inputBatchSizeLimit is a batch size limit for the number of input rows that
Expand Down Expand Up @@ -663,11 +675,6 @@ func adjustMemEstimate(estimate int64) int64 {
return estimate*memEstimateMultiplier + memEstimateAdditive
}

// GetScanStats is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetScanStats() execstats.ScanStats {
return execstats.GetScanStats(s.Ctx, nil /* recording */)
}

// Release implements the execinfra.Releasable interface.
func (s *ColIndexJoin) Release() {
s.cf.Release()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
s.KV.ContentionTime.Set(vsc.kvReader.GetContentionTime())
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
s.Exec.ConsumedRU.Set(scanStats.ConsumedRU)
s.Exec.ConsumedRU.Set(vsc.kvReader.GetConsumedRU())

// In order to account for SQL CPU time, we have to subtract the CPU time
// spent while serving KV requests on a SQL goroutine.
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ go_library(
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
Expand Down
41 changes: 17 additions & 24 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -378,14 +377,6 @@ type ProcessorBaseNoHelper struct {
//
// Can return nil.
ExecStatsForTrace func() *execinfrapb.ComponentStats
// storeExecStatsTrace indicates whether ExecStatsTrace should be populated
// in InternalClose.
storeExecStatsTrace bool
// ExecStatsTrace stores the recording in case HijackExecStatsForTrace has
// been called. This is needed in order to provide the access to the
// recording after the span has been finished in InternalClose. Only set if
// storeExecStatsTrace is true.
ExecStatsTrace tracingpb.Recording
// trailingMetaCallback, if set, will be called by moveToTrailingMeta(). The
// callback is expected to close all inputs, do other cleanup on the processor
// (including calling InternalClose()) and generate the trailing meta that
Expand Down Expand Up @@ -639,13 +630,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component
}
execStatsForTrace := pb.ExecStatsForTrace
pb.ExecStatsForTrace = nil
pb.storeExecStatsTrace = true
return func() *execinfrapb.ComponentStats {
cs := execStatsForTrace()
// Make sure to unset the trace since we don't need it anymore.
pb.ExecStatsTrace = nil
return cs
}
return execStatsForTrace
}

// moveToTrailingMeta switches the processor to the "trailing meta" state: only
Expand Down Expand Up @@ -679,9 +664,6 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() {
}
if trace := pb.span.GetConfiguredRecording(); trace != nil {
pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
if pb.storeExecStatsTrace {
pb.ExecStatsTrace = trace
}
}
}

Expand Down Expand Up @@ -838,14 +820,23 @@ func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {
// ProcessorSpan creates a child span for a processor (if we are doing any
// tracing). The returned span needs to be finished using tracing.FinishSpan.
func ProcessorSpan(
ctx context.Context, flowCtx *FlowCtx, name string, processorID int32,
ctx context.Context,
flowCtx *FlowCtx,
name string,
processorID int32,
eventListeners ...tracing.EventListener,
) (context.Context, *tracing.Span) {
sp := tracing.SpanFromContext(ctx)
if sp == nil {
return ctx, nil
}
retCtx, retSpan := sp.Tracer().StartSpanCtx(ctx, name,
tracing.WithParent(sp), tracing.WithDetachedRecording())
var listenersOpt tracing.SpanOption
if len(eventListeners) > 0 {
listenersOpt = tracing.WithEventListeners(eventListeners...)
}
retCtx, retSpan := sp.Tracer().StartSpanCtx(
ctx, name, tracing.WithParent(sp), tracing.WithDetachedRecording(), listenersOpt,
)
if retSpan.IsVerbose() {
retSpan.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(flowCtx.ID.String()))
retSpan.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(processorID)))
Expand All @@ -864,13 +855,15 @@ func ProcessorSpan(
// < other initialization >
//
// so that the caller doesn't mistakenly use old ctx object.
func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context {
func (pb *ProcessorBaseNoHelper) StartInternal(
ctx context.Context, name string, eventListeners ...tracing.EventListener,
) context.Context {
pb.origCtx = ctx
pb.ctx = ctx
noSpan := pb.FlowCtx != nil && pb.FlowCtx.Cfg != nil &&
pb.FlowCtx.Cfg.TestingKnobs.ProcessorNoTracingSpan
if !noSpan {
pb.ctx, pb.span = ProcessorSpan(ctx, pb.FlowCtx, name, pb.ProcessorID)
pb.ctx, pb.span = ProcessorSpan(ctx, pb.FlowCtx, name, pb.ProcessorID, eventListeners...)
}
pb.evalOrigCtx = pb.EvalCtx.SetDeprecatedContext(pb.ctx)
return pb.ctx
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/util/buildutil",
"//pkg/util/optional",
"//pkg/util/protoutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading

0 comments on commit 0308728

Please sign in to comment.