From da18c779f213e1389320dfbbe60b6d8a660f3be4 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 9 Aug 2023 12:38:08 -0400 Subject: [PATCH] backupccl: hookup tracing aggregtor events for the C2C job This change builds on top of #107994 and wires up each stream ingestion data processor to emit TracingAggregatorEvents to the frontier and subsequently the job coordinator. These events are periodically flushed to files in the `job_info` table and are consumable via the DBConsole Job Details page. Currently, the only aggregator event that is propagated is the IngestionPerformanceStats emitted by the sst batcher. Fixes: #100126 Release note: None --- pkg/ccl/backupccl/backup_processor.go | 13 ++++- pkg/ccl/backupccl/restore_data_processor.go | 14 ++++- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 1 + .../streamingest/stream_ingestion_dist.go | 26 ++++++++- .../stream_ingestion_processor.go | 58 ++++++++++++++++++- pkg/kv/bulk/sst_batcher.go | 11 +++- pkg/sql/colexec/colbuilder/execplan.go | 3 + pkg/util/bulk/BUILD.bazel | 1 + pkg/util/bulk/tracing_aggregator.go | 5 ++ 9 files changed, 124 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 497a055dc052..68bb167e9c5f 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -173,7 +173,11 @@ func newBackupDataProcessor( InputsToDrain: nil, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { bp.close() - return []execinfrapb.ProducerMetadata{*bp.constructTracingAggregatorProducerMeta(ctx)} + meta := bp.constructTracingAggregatorProducerMeta(ctx) + if meta == nil { + return nil + } + return []execinfrapb.ProducerMetadata{*meta} }, }); err != nil { return nil, err @@ -245,6 +249,9 @@ func (bp *backupDataProcessor) constructProgressProducerMeta( func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta( ctx context.Context, ) *execinfrapb.ProducerMetadata { + if bp.agg == nil { + return nil + } aggEvents := &execinfrapb.TracingAggregatorEvents{ SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(), FlowID: bp.flowCtx.ID, @@ -292,7 +299,9 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() if bp.InternalClose() { - bp.agg.Close() + if bp.agg != nil { + bp.agg.Close() + } bp.aggTimer.Stop() bp.memAcc.Close(bp.Ctx()) } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index af3e2a1d0396..0704a132f1f2 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -211,7 +211,11 @@ func newRestoreDataProcessor( InputsToDrain: []execinfra.RowSource{input}, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { rd.ConsumerClosed() - return []execinfrapb.ProducerMetadata{*rd.constructTracingAggregatorProducerMeta(ctx)} + meta := rd.constructTracingAggregatorProducerMeta(ctx) + if meta == nil { + return nil + } + return []execinfrapb.ProducerMetadata{*meta} }, }); err != nil { return nil, err @@ -686,6 +690,9 @@ func makeProgressUpdate( func (rd *restoreDataProcessor) constructTracingAggregatorProducerMeta( ctx context.Context, ) *execinfrapb.ProducerMetadata { + if rd.agg == nil { + return nil + } aggEvents := &execinfrapb.TracingAggregatorEvents{ SQLInstanceID: rd.flowCtx.NodeID.SQLInstanceID(), FlowID: rd.flowCtx.ID, @@ -748,7 +755,10 @@ func (rd *restoreDataProcessor) ConsumerClosed() { rd.cancelWorkersAndWait() rd.qp.Close(rd.Ctx()) - rd.agg.Close() + if rd.agg != nil { + rd.agg.Close() + } + rd.aggTimer.Stop() rd.InternalClose() } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 5d69331f39a4..80a4e001505e 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -61,6 +61,7 @@ go_library( "//pkg/sql/types", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index 1a3406063eb5..21ae2d5fc6ec 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -150,16 +151,37 @@ func startDistIngestion( func() time.Duration { return replanFrequency.Get(execCtx.ExecCfg().SV()) }, ) + tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents) + tracingAggLoop := func(ctx context.Context) error { + if err := bulk.AggregateTracingStats(ctx, ingestionJob.ID(), + execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil { + log.Warningf(ctx, "failed to aggregate tracing stats: %v", err) + // Drain the channel if the loop to aggregate tracing stats has returned + // an error. + for range tracingAggCh { + } + } + return nil + } + execInitialPlan := func(ctx context.Context) error { defer stopReplanner() + defer close(tracingAggCh) ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) + metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error { + if meta.AggregatorEvents != nil { + tracingAggCh <- meta.AggregatorEvents + } + return nil + } + rw := sql.NewRowResultWriter(nil /* rowContainer */) var noTxn *kv.Txn recv := sql.MakeDistSQLReceiver( ctx, - rw, + sql.NewMetadataCallbackWriter(rw, metaFn), tree.Rows, nil, /* rangeCache */ noTxn, @@ -175,7 +197,7 @@ func startDistIngestion( } updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running") - err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner) + err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop) if errors.Is(err, sql.ErrPlanChanged) { execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 436b46c3ba98..5e873b652670 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + "fmt" "sort" "sync" "time" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -256,6 +258,11 @@ type streamIngestionProcessor struct { metrics *Metrics logBufferEvery log.EveryN + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulkutil.TracingAggregator + aggTimer *timeutil.Timer } // partitionEvent augments a normal event with the partition it came from. @@ -323,7 +330,11 @@ func newStreamIngestionDataProcessor( InputsToDrain: []execinfra.RowSource{}, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { sip.close() - return nil + meta := sip.constructTracingAggregatorProducerMeta(ctx) + if meta == nil { + return nil + } + return []execinfrapb.ProducerMetadata{*meta} }, }, ); err != nil { @@ -333,6 +344,36 @@ func newStreamIngestionDataProcessor( return sip, nil } +func (sip *streamIngestionProcessor) constructTracingAggregatorProducerMeta( + ctx context.Context, +) *execinfrapb.ProducerMetadata { + if sip.agg == nil { + return nil + } + aggEvents := &execinfrapb.TracingAggregatorEvents{ + SQLInstanceID: sip.flowCtx.NodeID.SQLInstanceID(), + FlowID: sip.flowCtx.ID, + Events: make(map[string][]byte), + } + sip.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) { + msg, ok := event.(protoutil.Message) + if !ok { + // This should never happen but if it does skip the aggregated event. + log.Warningf(ctx, "event is not a protoutil.Message: %T", event) + return + } + data := make([]byte, msg.Size()) + if _, err := msg.MarshalTo(data); err != nil { + // This should never happen but if it does skip the aggregated event. + log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error()) + return + } + aggEvents.Events[name] = data + }) + + return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents} +} + // Start launches a set of goroutines that read from the spans // assigned to this processor and ingests them until cutover is // reached. @@ -358,6 +399,13 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { log.Infof(ctx, "starting ingest proc") ctx = sip.StartInternal(ctx, streamIngestionProcessorName) + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // sips' trace recording. + ctx, sip.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", streamIngestionProcessorName), sip.EvalCtx.Tracer) + sip.aggTimer = timeutil.NewTimer() + sip.aggTimer.Reset(15 * time.Second) + sip.metrics = sip.flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics) evalCtx := sip.FlowCtx.EvalCtx @@ -470,6 +518,10 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr } return row, nil } + case <-sip.aggTimer.C: + sip.aggTimer.Read = true + sip.aggTimer.Reset(15 * time.Second) + return nil, sip.constructTracingAggregatorProducerMeta(sip.Ctx()) case err := <-sip.errCh: sip.MoveToDraining(err) return nil, sip.DrainHelper() @@ -533,6 +585,10 @@ func (sip *streamIngestionProcessor) close() { if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() } + if sip.agg != nil { + sip.agg.Close() + } + sip.aggTimer.Stop() sip.InternalClose() } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index f8cf66d628f6..97ca41e42d02 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -247,7 +247,16 @@ func MakeStreamSSTBatcher( sendLimiter limit.ConcurrentRequestLimiter, onFlush func(summary kvpb.BulkOpSummary), ) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, rc: rc, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter} + b := &SSTBatcher{ + db: db, + rc: rc, + settings: settings, + ingestAll: true, + mem: mem, + limiter: sendLimiter, + } + b.mu.lastFlush = timeutil.Now() + b.mu.tracingSpan = tracing.SpanFromContext(ctx) b.SetOnFlush(onFlush) err := b.Reset(ctx) return b, err diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index c8f473013afa..e8d78fbd7b7c 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -266,6 +266,7 @@ var ( errNonInnerMergeJoinWithOnExpr = errors.New("can't plan vectorized non-inner merge joins with ON expressions") errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported") errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported") + errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758") ) func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error { @@ -314,7 +315,9 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo case core.RestoreData != nil: case core.Filterer != nil: case core.StreamIngestionData != nil: + return errStreamIngestionWrap case core.StreamIngestionFrontier != nil: + return errStreamIngestionWrap case core.HashGroupJoiner != nil: default: return errors.AssertionFailedf("unexpected processor core %q", core) diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index bcc09d38b2e8..e898d02aae3c 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/isql", "//pkg/sql/protoreflect", + "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/util/bulk/tracing_aggregator.go b/pkg/util/bulk/tracing_aggregator.go index ec97279d0a0e..1e2c5b375b52 100644 --- a/pkg/util/bulk/tracing_aggregator.go +++ b/pkg/util/bulk/tracing_aggregator.go @@ -13,6 +13,7 @@ package bulk import ( "context" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -121,6 +122,10 @@ var _ tracing.EventListener = &TracingAggregator{} func MakeTracingAggregatorWithSpan( ctx context.Context, aggregatorName string, tracer *tracing.Tracer, ) (context.Context, *TracingAggregator) { + if tracing.SpanFromContext(ctx) == nil { + log.Warningf(ctx, "tracing aggregator %s cannot be created without a tracing span", aggregatorName) + return ctx, nil + } agg := &TracingAggregator{} aggCtx, aggSpan := tracing.EnsureChildSpan(ctx, tracer, aggregatorName, tracing.WithEventListeners(agg))