diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 570d82c86423..7c49edbad5d6 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -282,6 +282,11 @@ func (ca *changeAggregator) wrapMetricsController( return recorderWithTelemetry, nil } +const ( + changeAggregatorLogTag = "change-aggregator" + changeFrontierLogTag = "change-frontier" +) + // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { // Derive a separate context so that we can shutdown the poller. @@ -290,6 +295,8 @@ func (ca *changeAggregator) Start(ctx context.Context) { if ca.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", ca.spec.JobID) } + ctx = logtags.RemoveTag(ctx, changeFrontierLogTag) + ctx = logtags.AddTag(ctx, changeAggregatorLogTag, nil /* value */) ctx = ca.StartInternal(ctx, changeAggregatorProcName) spans, err := ca.setupSpansAndFrontier() @@ -1237,6 +1244,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", cf.spec.JobID) } + ctx = logtags.AddTag(ctx, changeFrontierLogTag, nil /* value */) // StartInternal called at the beginning of the function because there are // early returns if errors are detected. ctx = cf.StartInternal(ctx, changeFrontierProcName)