From 202a9ac7aa20f184d1edc69103b4a14a7f18c51b Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Sun, 10 Nov 2024 04:52:59 +0000 Subject: [PATCH] changefeedccl: add log tag for processor type This patch adds a log tag for the processor type so that we can easily tell whether a log came from a changefeed's frontier or one of its aggregators. Release note: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 570d82c86423..7c49edbad5d6 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -282,6 +282,11 @@ func (ca *changeAggregator) wrapMetricsController( return recorderWithTelemetry, nil } +const ( + changeAggregatorLogTag = "change-aggregator" + changeFrontierLogTag = "change-frontier" +) + // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { // Derive a separate context so that we can shutdown the poller. @@ -290,6 +295,8 @@ func (ca *changeAggregator) Start(ctx context.Context) { if ca.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", ca.spec.JobID) } + ctx = logtags.RemoveTag(ctx, changeFrontierLogTag) + ctx = logtags.AddTag(ctx, changeAggregatorLogTag, nil /* value */) ctx = ca.StartInternal(ctx, changeAggregatorProcName) spans, err := ca.setupSpansAndFrontier() @@ -1237,6 +1244,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", cf.spec.JobID) } + ctx = logtags.AddTag(ctx, changeFrontierLogTag, nil /* value */) // StartInternal called at the beginning of the function because there are // early returns if errors are detected. ctx = cf.StartInternal(ctx, changeFrontierProcName)