Skip to content

Commit

Permalink
backupccl: hookup tracing aggregtor events for the C2C job
Browse files Browse the repository at this point in the history
This change builds on top of cockroachdb#107994 and wires up each stream
ingestion data processor to emit TracingAggregatorEvents to
the frontier and subsequently the job coordinator.
These events are periodically flushed to files in the `job_info`
table and are consumable via the DBConsole Job Details page.

Currently, the only aggregator event that is propagated is the
IngestionPerformanceStats emitted by the sst batcher.

Fixes: cockroachdb#100126
Release note: None
  • Loading branch information
adityamaru committed Aug 9, 2023
1 parent f49f5b1 commit 6370e46
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
26 changes: 24 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -153,18 +154,39 @@ func startDistIngestion(
func() time.Duration { return replanFrequency.Get(execCtx.ExecCfg().SV()) },
)

tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, ingestionJob.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Drain the channel if the loop to aggregate tracing stats has returned
// an error.
for range tracingAggCh {
}
}
return nil
}

execInitialPlan := func(ctx context.Context) error {
log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d",
ingestionJob.ID())
defer stopReplanner()
defer close(tracingAggCh)
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

rw := sql.NewRowResultWriter(nil /* rowContainer */)

var noTxn *kv.Txn
recv := sql.MakeDistSQLReceiver(
ctx,
rw,
sql.NewMetadataCallbackWriter(rw, metaFn),
tree.Rows,
nil, /* rangeCache */
noTxn,
Expand All @@ -181,7 +203,7 @@ func startDistIngestion(

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating,
"running the SQL flow for the stream ingestion job")
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner)
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
Expand Down
48 changes: 47 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -256,6 +258,11 @@ type streamIngestionProcessor struct {
metrics *Metrics

logBufferEvery log.EveryN

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -323,7 +330,7 @@ func newStreamIngestionDataProcessor(
InputsToDrain: []execinfra.RowSource{},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
sip.close()
return nil
return []execinfrapb.ProducerMetadata{*sip.constructTracingAggregatorProducerMeta(ctx)}
},
},
); err != nil {
Expand All @@ -333,6 +340,34 @@ func newStreamIngestionDataProcessor(
return sip, nil
}

func (sip *streamIngestionProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: sip.flowCtx.NodeID.SQLInstanceID(),
FlowID: sip.flowCtx.ID,
Events: make(map[string][]byte),
}
sip.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) {
msg, ok := event.(protoutil.Message)
if !ok {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "event is not a protoutil.Message: %T", event)
return
}
data := make([]byte, msg.Size())
if _, err := msg.MarshalTo(data); err != nil {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[name] = data
})
log.Infof(ctx, "constructing tracing aggregator %+v", aggEvents)

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// Start launches a set of goroutines that read from the spans
// assigned to this processor and ingests them until cutover is
// reached.
Expand All @@ -358,6 +393,13 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
log.Infof(ctx, "starting ingest proc")
ctx = sip.StartInternal(ctx, streamIngestionProcessorName)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// sips' trace recording.
ctx, sip.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", streamIngestionProcessorName), sip.EvalCtx.Tracer)
sip.aggTimer = timeutil.NewTimer()
sip.aggTimer.Reset(15 * time.Second)

sip.metrics = sip.flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics)

evalCtx := sip.FlowCtx.EvalCtx
Expand Down Expand Up @@ -469,6 +511,10 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
}
return row, nil
}
case <-sip.aggTimer.C:
sip.aggTimer.Read = true
sip.aggTimer.Reset(15 * time.Second)
return nil, sip.constructTracingAggregatorProducerMeta(sip.Ctx())
case err := <-sip.errCh:
sip.MoveToDraining(err)
return nil, sip.DrainHelper()
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,16 @@ func MakeStreamSSTBatcher(
sendLimiter limit.ConcurrentRequestLimiter,
onFlush func(summary kvpb.BulkOpSummary),
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, rc: rc, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
b := &SSTBatcher{
db: db,
rc: rc,
settings: settings,
ingestAll: true,
mem: mem,
limiter: sendLimiter,
}
b.mu.lastFlush = timeutil.Now()
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
b.SetOnFlush(onFlush)
err := b.Reset(ctx)
return b, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ var (
errNonInnerMergeJoinWithOnExpr = errors.New("can't plan vectorized non-inner merge joins with ON expressions")
errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported")
errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported")
errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error {
Expand Down Expand Up @@ -313,7 +314,9 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo
case core.RestoreData != nil:
case core.Filterer != nil:
case core.StreamIngestionData != nil:
return errStreamIngestionWrap
case core.StreamIngestionFrontier != nil:
return errStreamIngestionWrap
case core.HashGroupJoiner != nil:
default:
return errors.AssertionFailedf("unexpected processor core %q", core)
Expand Down

0 comments on commit 6370e46

Please sign in to comment.