From 7b1f6a14e2b1c58cd93727b7e4452522c8f751ac Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Wed, 2 Oct 2024 16:15:01 -0400 Subject: [PATCH 1/4] changefeedccl/kvfeed: add debug logs around schema change boundary types Release note: None --- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 5a5a8483d4ea..079777d8e74e 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -318,6 +318,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) { log.Infof(ctx, "kv feed run starting") emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error { + if log.V(2) { + log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans) + } for _, sp := range f.spans { if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil { return err @@ -412,10 +415,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) { // Exit if the policy says we should. if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT { + log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType) return schemaChangeDetectedError{ts: schemaChangeTS} } - log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS) + log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType) } } From 0419f5d60fdd1f12ef4deb41a9bfd947f4893cc8 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Wed, 2 Oct 2024 16:25:53 -0400 Subject: [PATCH 2/4] changefeedccl: add debug logs around when processors move to draining Release note: None --- .../changefeedccl/changefeed_processors.go | 65 +++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index eddf31cb3d13..3c9bfd47c254 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -292,16 +292,19 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx = ca.StartInternal(ctx, changeAggregatorProcName) spans, err := ca.setupSpansAndFrontier() - - feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) - - opts := feed.Opts - if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err) + } ca.MoveToDraining(err) ca.cancel() return } + + feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) + + opts := feed.Opts + timestampOracle := &changeAggregatorLowerBoundOracle{ sf: ca.frontier, initialInclusiveLowerBound: feed.ScanTime, @@ -318,6 +321,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { scope, _ := opts.GetMetricScope() ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -329,6 +335,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { if !ca.isSinkless() { recorder, err = ca.wrapMetricsController(ctx, recorder) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -339,6 +348,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -370,6 +382,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV) ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -379,6 +394,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -687,6 +705,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // NB: we do not invoke ca.cancel here -- just merely moving // to drain state so that the trailing metadata callback // has a chance to produce shutdown checkpoint. + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err) + } ca.MoveToDraining(err) break } @@ -708,6 +729,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Shut down the poller if it wasn't already. ca.cancel() + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err) + } ca.MoveToDraining(err) break } @@ -1233,6 +1257,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope] sli, err := cf.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err) + } cf.MoveToDraining(err) return } @@ -1240,9 +1267,11 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle, cf.spec.User(), cf.spec.JobID, sli) - if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err) + } cf.MoveToDraining(err) return } @@ -1255,6 +1284,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.evalCtx.ChangefeedState == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state") + } cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state")) return } @@ -1265,6 +1297,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err) + } cf.MoveToDraining(err) return } @@ -1286,6 +1321,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.frontier.initialHighWater = *ts for _, span := range cf.spec.TrackedSpans { if _, err := cf.frontier.Forward(span, *ts); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err) + } cf.MoveToDraining(err) return } @@ -1459,6 +1497,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // TODO(ajwerner): make this more useful by at least informing the client // of which tables changed. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err) + } cf.MoveToDraining(err) break } @@ -1466,6 +1507,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad row, meta := cf.input.Next() if meta != nil { if meta.Err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err) + } cf.MoveToDraining(nil /* err */) } if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil { @@ -1473,11 +1517,17 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // that the aggregator exited due to node shutdown. Transition to // draining so that the remaining aggregators will shut down and // transmit their up-to-date frontier. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed) + } cf.MoveToDraining(changefeedbase.ErrNodeDraining) } return nil, meta } if row == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator") + } cf.MoveToDraining(nil /* err */) break } @@ -1492,6 +1542,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } if err := cf.noteAggregatorProgress(row[0]); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err) + } cf.MoveToDraining(err) break } From 3c35540f981e4d5e9308cbdcfb2c5d6d1a0cf914 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Wed, 2 Oct 2024 17:04:25 -0400 Subject: [PATCH 3/4] changefeedccl: enable verbose logs for a few schema change unit tests We're seeing that some changefeed unit tests involving schema changes are sometimes flaking when using a core changefeed. This patch enables verbose logs that will provide more information about what the kv feed is doing when it sees the schema change and when processors are moving to draining. Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 30 ++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 6fdd93f3a81c..29368fb7ab0c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1581,6 +1581,8 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1685,6 +1687,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1719,6 +1723,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1766,6 +1772,8 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1813,6 +1821,8 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1847,6 +1857,8 @@ func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1940,6 +1952,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes >1 min under race") + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2542,6 +2556,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2736,6 +2752,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2893,6 +2911,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB) @@ -2955,6 +2975,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`) @@ -2985,6 +3007,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -3103,6 +3127,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + // requireErrorSoon times out after 30 seconds skip.UnderStress(t) skip.UnderRace(t) @@ -3147,6 +3173,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -8597,6 +8625,8 @@ func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) From 22f68a13f66f5c61c75f29f38840f8b791403c76 Mon Sep 17 00:00:00 2001 From: cockroach-teamcity Date: Wed, 2 Oct 2024 07:17:26 +0000 Subject: [PATCH 4/4] ci: update bazel builder image Release note: None Epic: None --- build/.bazelbuilderversion | 2 +- build/toolchains/BUILD.bazel | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build/.bazelbuilderversion b/build/.bazelbuilderversion index 68fecce875bb..8650015d2b9b 100644 --- a/build/.bazelbuilderversion +++ b/build/.bazelbuilderversion @@ -1 +1 @@ -us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20240605-060303 \ No newline at end of file +us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20241002-062031 \ No newline at end of file diff --git a/build/toolchains/BUILD.bazel b/build/toolchains/BUILD.bazel index 0b3bcaf8d44c..43befebfceee 100644 --- a/build/toolchains/BUILD.bazel +++ b/build/toolchains/BUILD.bazel @@ -32,7 +32,7 @@ platform( "@platforms//cpu:x86_64", ], exec_properties = { - "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:1244a0ff14aed92470cfc02c20a9483171da5872b424d94844cbebf9a1e7364a", + "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:d0c6255981566ee171634bfb2ef28fc9357bf10247720f6c766567f1c4066d13", "dockerReuse": "True", "Pool": "default", }, @@ -137,7 +137,7 @@ platform( "@platforms//cpu:arm64", ], exec_properties = { - "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:03e760315b966b32c5ca195927fe4a2a1c262f448089a5728b740696e7f881eb", + "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:e967099c1c80c2bfe39e06d31ef0d6983b09fff8849887747c4e7096fc980128", "dockerReuse": "True", "Pool": "default", },