Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: hookup tracing aggregtor events for the C2C job #108458

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ func newBackupDataProcessor(
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
bp.close()
return []execinfrapb.ProducerMetadata{*bp.constructTracingAggregatorProducerMeta(ctx)}
meta := bp.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
}); err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +249,9 @@ func (bp *backupDataProcessor) constructProgressProducerMeta(
func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if bp.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(),
FlowID: bp.flowCtx.ID,
Expand Down Expand Up @@ -292,7 +299,9 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
if bp.InternalClose() {
bp.agg.Close()
if bp.agg != nil {
bp.agg.Close()
}
bp.aggTimer.Stop()
bp.memAcc.Close(bp.Ctx())
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ func newRestoreDataProcessor(
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
rd.ConsumerClosed()
return []execinfrapb.ProducerMetadata{*rd.constructTracingAggregatorProducerMeta(ctx)}
meta := rd.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
}); err != nil {
return nil, err
Expand Down Expand Up @@ -686,6 +690,9 @@ func makeProgressUpdate(
func (rd *restoreDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if rd.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: rd.flowCtx.NodeID.SQLInstanceID(),
FlowID: rd.flowCtx.ID,
Expand Down Expand Up @@ -748,7 +755,10 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
rd.cancelWorkersAndWait()

rd.qp.Close(rd.Ctx())
rd.agg.Close()
if rd.agg != nil {
rd.agg.Close()
}
rd.aggTimer.Stop()
rd.InternalClose()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
26 changes: 24 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -150,16 +151,37 @@ func startDistIngestion(
func() time.Duration { return replanFrequency.Get(execCtx.ExecCfg().SV()) },
)

tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, ingestionJob.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Drain the channel if the loop to aggregate tracing stats has returned
// an error.
for range tracingAggCh {
}
}
return nil
}

execInitialPlan := func(ctx context.Context) error {
defer stopReplanner()
defer close(tracingAggCh)
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

rw := sql.NewRowResultWriter(nil /* rowContainer */)

var noTxn *kv.Txn
recv := sql.MakeDistSQLReceiver(
ctx,
rw,
sql.NewMetadataCallbackWriter(rw, metaFn),
tree.Rows,
nil, /* rangeCache */
noTxn,
Expand All @@ -175,7 +197,7 @@ func startDistIngestion(
}

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running")
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner)
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
Expand Down
58 changes: 57 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -256,6 +258,11 @@ type streamIngestionProcessor struct {
metrics *Metrics

logBufferEvery log.EveryN

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -323,7 +330,11 @@ func newStreamIngestionDataProcessor(
InputsToDrain: []execinfra.RowSource{},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
sip.close()
return nil
meta := sip.constructTracingAggregatorProducerMeta(ctx)
if meta == nil {
return nil
}
return []execinfrapb.ProducerMetadata{*meta}
},
},
); err != nil {
Expand All @@ -333,6 +344,36 @@ func newStreamIngestionDataProcessor(
return sip, nil
}

func (sip *streamIngestionProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
if sip.agg == nil {
return nil
}
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: sip.flowCtx.NodeID.SQLInstanceID(),
FlowID: sip.flowCtx.ID,
Events: make(map[string][]byte),
}
sip.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) {
msg, ok := event.(protoutil.Message)
if !ok {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "event is not a protoutil.Message: %T", event)
return
}
data := make([]byte, msg.Size())
if _, err := msg.MarshalTo(data); err != nil {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[name] = data
})

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// Start launches a set of goroutines that read from the spans
// assigned to this processor and ingests them until cutover is
// reached.
Expand All @@ -358,6 +399,13 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
log.Infof(ctx, "starting ingest proc")
ctx = sip.StartInternal(ctx, streamIngestionProcessorName)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// sips' trace recording.
ctx, sip.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", streamIngestionProcessorName), sip.EvalCtx.Tracer)
sip.aggTimer = timeutil.NewTimer()
sip.aggTimer.Reset(15 * time.Second)

sip.metrics = sip.flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics)

evalCtx := sip.FlowCtx.EvalCtx
Expand Down Expand Up @@ -470,6 +518,10 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
}
return row, nil
}
case <-sip.aggTimer.C:
sip.aggTimer.Read = true
sip.aggTimer.Reset(15 * time.Second)
return nil, sip.constructTracingAggregatorProducerMeta(sip.Ctx())
case err := <-sip.errCh:
sip.MoveToDraining(err)
return nil, sip.DrainHelper()
Expand Down Expand Up @@ -533,6 +585,10 @@ func (sip *streamIngestionProcessor) close() {
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
if sip.agg != nil {
sip.agg.Close()
}
sip.aggTimer.Stop()

sip.InternalClose()
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,16 @@ func MakeStreamSSTBatcher(
sendLimiter limit.ConcurrentRequestLimiter,
onFlush func(summary kvpb.BulkOpSummary),
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, rc: rc, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
b := &SSTBatcher{
db: db,
rc: rc,
settings: settings,
ingestAll: true,
mem: mem,
limiter: sendLimiter,
}
b.mu.lastFlush = timeutil.Now()
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
b.SetOnFlush(onFlush)
err := b.Reset(ctx)
return b, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ var (
errNonInnerMergeJoinWithOnExpr = errors.New("can't plan vectorized non-inner merge joins with ON expressions")
errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported")
errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported")
errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error {
Expand Down Expand Up @@ -314,7 +315,9 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo
case core.RestoreData != nil:
case core.Filterer != nil:
case core.StreamIngestionData != nil:
return errStreamIngestionWrap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I read the linked issue and understand why we need this. I'm a little surprised we don't need this for the restore case though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised too, let me dig some more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has something to do with the fact that in c2c the stream ingestion procs push the meta downstream to another processor in the frontier proc. In restore however the restore data processor is the leaf processor and pushes metas to the row result writer. I'll add some logging to sanity check that the restore data procs aren't doing any unwanted buffering of progress metas.

case core.StreamIngestionFrontier != nil:
return errStreamIngestionWrap
case core.HashGroupJoiner != nil:
default:
return errors.AssertionFailedf("unexpected processor core %q", core)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/protoreflect",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/bulk/tracing_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package bulk
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -121,6 +122,10 @@ var _ tracing.EventListener = &TracingAggregator{}
func MakeTracingAggregatorWithSpan(
ctx context.Context, aggregatorName string, tracer *tracing.Tracer,
) (context.Context, *TracingAggregator) {
if tracing.SpanFromContext(ctx) == nil {
log.Warningf(ctx, "tracing aggregator %s cannot be created without a tracing span", aggregatorName)
return ctx, nil
}
agg := &TracingAggregator{}
aggCtx, aggSpan := tracing.EnsureChildSpan(ctx, tracer, aggregatorName,
tracing.WithEventListeners(agg))
Expand Down