Skip to content

Commit

Permalink
backupccl: hookup tracing aggregtor events for the C2C job
Browse files Browse the repository at this point in the history
This change builds on top of #107994 and wires up each stream
ingestion data processor to emit TracingAggregatorEvents to
the frontier and subsequently the job coordinator.
These events are periodically flushed to files in the `job_info`
table and are consumable via the DBConsole Job Details page.

Currently, the only aggregator event that is propagated is the
IngestionPerformanceStats emitted by the sst batcher.

Fixes: #100126
Release note: None
  • Loading branch information
adityamaru committed Aug 25, 2023
1 parent 155dc00 commit da18c77
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 8 deletions.
13 changes: 11 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ func newBackupDataProcessor(
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
bp.close()
return []execinfrapb.ProducerMetadata{*bp.constructTracingAggregatorProducerMeta(ctx)}
meta := bp.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
}); err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +249,9 @@ func (bp *backupDataProcessor) constructProgressProducerMeta(
func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if bp.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(),
FlowID: bp.flowCtx.ID,
Expand Down Expand Up @@ -292,7 +299,9 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
if bp.InternalClose() {
bp.agg.Close()
if bp.agg != nil {
bp.agg.Close()
}
bp.aggTimer.Stop()
bp.memAcc.Close(bp.Ctx())
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ func newRestoreDataProcessor(
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
rd.ConsumerClosed()
return []execinfrapb.ProducerMetadata{*rd.constructTracingAggregatorProducerMeta(ctx)}
meta := rd.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
}); err != nil {
return nil, err
Expand Down Expand Up @@ -686,6 +690,9 @@ func makeProgressUpdate(
func (rd *restoreDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if rd.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: rd.flowCtx.NodeID.SQLInstanceID(),
FlowID: rd.flowCtx.ID,
Expand Down Expand Up @@ -748,7 +755,10 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
rd.cancelWorkersAndWait()

rd.qp.Close(rd.Ctx())
rd.agg.Close()
if rd.agg != nil {
rd.agg.Close()
}
rd.aggTimer.Stop()
rd.InternalClose()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
26 changes: 24 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -150,16 +151,37 @@ func startDistIngestion(
func() time.Duration { return replanFrequency.Get(execCtx.ExecCfg().SV()) },
)

tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, ingestionJob.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Drain the channel if the loop to aggregate tracing stats has returned
// an error.
for range tracingAggCh {
}
}
return nil
}

execInitialPlan := func(ctx context.Context) error {
defer stopReplanner()
defer close(tracingAggCh)
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

rw := sql.NewRowResultWriter(nil /* rowContainer */)

var noTxn *kv.Txn
recv := sql.MakeDistSQLReceiver(
ctx,
rw,
sql.NewMetadataCallbackWriter(rw, metaFn),
tree.Rows,
nil, /* rangeCache */
noTxn,
Expand All @@ -175,7 +197,7 @@ func startDistIngestion(
}

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running")
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner)
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
Expand Down
58 changes: 57 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -256,6 +258,11 @@ type streamIngestionProcessor struct {
metrics *Metrics

logBufferEvery log.EveryN

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -323,7 +330,11 @@ func newStreamIngestionDataProcessor(
InputsToDrain: []execinfra.RowSource{},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
sip.close()
return nil
meta := sip.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
},
); err != nil {
Expand All @@ -333,6 +344,36 @@ func newStreamIngestionDataProcessor(
return sip, nil
}

func (sip *streamIngestionProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if sip.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: sip.flowCtx.NodeID.SQLInstanceID(),
FlowID: sip.flowCtx.ID,
Events: make(map[string][]byte),
}
sip.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) {
msg, ok := event.(protoutil.Message)
if !ok {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "event is not a protoutil.Message: %T", event)
return
}
data := make([]byte, msg.Size())
if _, err := msg.MarshalTo(data); err != nil {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[name] = data
})

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// Start launches a set of goroutines that read from the spans
// assigned to this processor and ingests them until cutover is
// reached.
Expand All @@ -358,6 +399,13 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
log.Infof(ctx, "starting ingest proc")
ctx = sip.StartInternal(ctx, streamIngestionProcessorName)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// sips' trace recording.
ctx, sip.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", streamIngestionProcessorName), sip.EvalCtx.Tracer)
sip.aggTimer = timeutil.NewTimer()
sip.aggTimer.Reset(15 * time.Second)

sip.metrics = sip.flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics)

evalCtx := sip.FlowCtx.EvalCtx
Expand Down Expand Up @@ -470,6 +518,10 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
}
return row, nil
}
case <-sip.aggTimer.C:
sip.aggTimer.Read = true
sip.aggTimer.Reset(15 * time.Second)
return nil, sip.constructTracingAggregatorProducerMeta(sip.Ctx())
case err := <-sip.errCh:
sip.MoveToDraining(err)
return nil, sip.DrainHelper()
Expand Down Expand Up @@ -533,6 +585,10 @@ func (sip *streamIngestionProcessor) close() {
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
if sip.agg != nil {
sip.agg.Close()
}
sip.aggTimer.Stop()

sip.InternalClose()
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,16 @@ func MakeStreamSSTBatcher(
sendLimiter limit.ConcurrentRequestLimiter,
onFlush func(summary kvpb.BulkOpSummary),
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, rc: rc, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
b := &SSTBatcher{
db: db,
rc: rc,
settings: settings,
ingestAll: true,
mem: mem,
limiter: sendLimiter,
}
b.mu.lastFlush = timeutil.Now()
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
b.SetOnFlush(onFlush)
err := b.Reset(ctx)
return b, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ var (
errNonInnerMergeJoinWithOnExpr = errors.New("can't plan vectorized non-inner merge joins with ON expressions")
errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported")
errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported")
errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error {
Expand Down Expand Up @@ -314,7 +315,9 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo
case core.RestoreData != nil:
case core.Filterer != nil:
case core.StreamIngestionData != nil:
return errStreamIngestionWrap
case core.StreamIngestionFrontier != nil:
return errStreamIngestionWrap
case core.HashGroupJoiner != nil:
default:
return errors.AssertionFailedf("unexpected processor core %q", core)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/protoreflect",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/bulk/tracing_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package bulk
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -121,6 +122,10 @@ var _ tracing.EventListener = &TracingAggregator{}
func MakeTracingAggregatorWithSpan(
ctx context.Context, aggregatorName string, tracer *tracing.Tracer,
) (context.Context, *TracingAggregator) {
if tracing.SpanFromContext(ctx) == nil {
log.Warningf(ctx, "tracing aggregator %s cannot be created without a tracing span", aggregatorName)
return ctx, nil
}
agg := &TracingAggregator{}
aggCtx, aggSpan := tracing.EnsureChildSpan(ctx, tracer, aggregatorName,
tracing.WithEventListeners(agg))
Expand Down

0 comments on commit da18c77

Please sign in to comment.