diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index be429160d16e..41d51c75d091 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -61,6 +61,7 @@ go_library( "//pkg/sql/types", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index cf2a6d74f81d..e3877746cb91 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -153,18 +154,39 @@ func startDistIngestion( func() time.Duration { return replanFrequency.Get(execCtx.ExecCfg().SV()) }, ) + tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents) + tracingAggLoop := func(ctx context.Context) error { + if err := bulk.AggregateTracingStats(ctx, ingestionJob.ID(), + execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil { + log.Warningf(ctx, "failed to aggregate tracing stats: %v", err) + // Drain the channel if the loop to aggregate tracing stats has returned + // an error. + for range tracingAggCh { + } + } + return nil + } + execInitialPlan := func(ctx context.Context) error { log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", ingestionJob.ID()) defer stopReplanner() + defer close(tracingAggCh) ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) + metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error { + if meta.AggregatorEvents != nil { + tracingAggCh <- meta.AggregatorEvents + } + return nil + } + rw := sql.NewRowResultWriter(nil /* rowContainer */) var noTxn *kv.Txn recv := sql.MakeDistSQLReceiver( ctx, - rw, + sql.NewMetadataCallbackWriter(rw, metaFn), tree.Rows, nil, /* rangeCache */ noTxn, @@ -181,7 +203,7 @@ func startDistIngestion( updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "running the SQL flow for the stream ingestion job") - err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner) + err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop) if errors.Is(err, sql.ErrPlanChanged) { execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 9c7adddcc1c4..eadf1d559adc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + "fmt" "sort" "sync" "time" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -256,6 +258,11 @@ type streamIngestionProcessor struct { metrics *Metrics logBufferEvery log.EveryN + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulkutil.TracingAggregator + aggTimer *timeutil.Timer } // partitionEvent augments a normal event with the partition it came from. @@ -323,7 +330,7 @@ func newStreamIngestionDataProcessor( InputsToDrain: []execinfra.RowSource{}, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { sip.close() - return nil + return []execinfrapb.ProducerMetadata{*sip.constructTracingAggregatorProducerMeta(ctx)} }, }, ); err != nil { @@ -333,6 +340,34 @@ func newStreamIngestionDataProcessor( return sip, nil } +func (sip *streamIngestionProcessor) constructTracingAggregatorProducerMeta( + ctx context.Context, +) *execinfrapb.ProducerMetadata { + aggEvents := &execinfrapb.TracingAggregatorEvents{ + SQLInstanceID: sip.flowCtx.NodeID.SQLInstanceID(), + FlowID: sip.flowCtx.ID, + Events: make(map[string][]byte), + } + sip.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) { + msg, ok := event.(protoutil.Message) + if !ok { + // This should never happen but if it does skip the aggregated event. + log.Warningf(ctx, "event is not a protoutil.Message: %T", event) + return + } + data := make([]byte, msg.Size()) + if _, err := msg.MarshalTo(data); err != nil { + // This should never happen but if it does skip the aggregated event. + log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error()) + return + } + aggEvents.Events[name] = data + }) + log.Infof(ctx, "constructing tracing aggregator %+v", aggEvents) + + return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents} +} + // Start launches a set of goroutines that read from the spans // assigned to this processor and ingests them until cutover is // reached. @@ -358,6 +393,13 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { log.Infof(ctx, "starting ingest proc") ctx = sip.StartInternal(ctx, streamIngestionProcessorName) + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // sips' trace recording. + ctx, sip.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", streamIngestionProcessorName), sip.EvalCtx.Tracer) + sip.aggTimer = timeutil.NewTimer() + sip.aggTimer.Reset(15 * time.Second) + sip.metrics = sip.flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics) evalCtx := sip.FlowCtx.EvalCtx @@ -469,6 +511,10 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr } return row, nil } + case <-sip.aggTimer.C: + sip.aggTimer.Read = true + sip.aggTimer.Reset(15 * time.Second) + return nil, sip.constructTracingAggregatorProducerMeta(sip.Ctx()) case err := <-sip.errCh: sip.MoveToDraining(err) return nil, sip.DrainHelper() diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index f8cf66d628f6..97ca41e42d02 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -247,7 +247,16 @@ func MakeStreamSSTBatcher( sendLimiter limit.ConcurrentRequestLimiter, onFlush func(summary kvpb.BulkOpSummary), ) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, rc: rc, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter} + b := &SSTBatcher{ + db: db, + rc: rc, + settings: settings, + ingestAll: true, + mem: mem, + limiter: sendLimiter, + } + b.mu.lastFlush = timeutil.Now() + b.mu.tracingSpan = tracing.SpanFromContext(ctx) b.SetOnFlush(onFlush) err := b.Reset(ctx) return b, err diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 7cba0d80670a..77dcbdd30e65 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -265,6 +265,7 @@ var ( errNonInnerMergeJoinWithOnExpr = errors.New("can't plan vectorized non-inner merge joins with ON expressions") errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported") errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported") + errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758") ) func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error { @@ -313,7 +314,9 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo case core.RestoreData != nil: case core.Filterer != nil: case core.StreamIngestionData != nil: + return errStreamIngestionWrap case core.StreamIngestionFrontier != nil: + return errStreamIngestionWrap case core.HashGroupJoiner != nil: default: return errors.AssertionFailedf("unexpected processor core %q", core)