diff --git a/build/.bazelbuilderversion b/build/.bazelbuilderversion index 68fecce875bb..8650015d2b9b 100644 --- a/build/.bazelbuilderversion +++ b/build/.bazelbuilderversion @@ -1 +1 @@ -us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20240605-060303 \ No newline at end of file +us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20241002-062031 \ No newline at end of file diff --git a/build/toolchains/BUILD.bazel b/build/toolchains/BUILD.bazel index 0b3bcaf8d44c..43befebfceee 100644 --- a/build/toolchains/BUILD.bazel +++ b/build/toolchains/BUILD.bazel @@ -32,7 +32,7 @@ platform( "@platforms//cpu:x86_64", ], exec_properties = { - "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:1244a0ff14aed92470cfc02c20a9483171da5872b424d94844cbebf9a1e7364a", + "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:d0c6255981566ee171634bfb2ef28fc9357bf10247720f6c766567f1c4066d13", "dockerReuse": "True", "Pool": "default", }, @@ -137,7 +137,7 @@ platform( "@platforms//cpu:arm64", ], exec_properties = { - "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:03e760315b966b32c5ca195927fe4a2a1c262f448089a5728b740696e7f881eb", + "container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:e967099c1c80c2bfe39e06d31ef0d6983b09fff8849887747c4e7096fc980128", "dockerReuse": "True", "Pool": "default", }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index eddf31cb3d13..3c9bfd47c254 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -292,16 +292,19 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx = ca.StartInternal(ctx, changeAggregatorProcName) spans, err := ca.setupSpansAndFrontier() - - feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) - - opts := feed.Opts - if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err) + } ca.MoveToDraining(err) ca.cancel() return } + + feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) + + opts := feed.Opts + timestampOracle := &changeAggregatorLowerBoundOracle{ sf: ca.frontier, initialInclusiveLowerBound: feed.ScanTime, @@ -318,6 +321,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { scope, _ := opts.GetMetricScope() ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -329,6 +335,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { if !ca.isSinkless() { recorder, err = ca.wrapMetricsController(ctx, recorder) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -339,6 +348,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -370,6 +382,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV) ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -379,6 +394,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -687,6 +705,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // NB: we do not invoke ca.cancel here -- just merely moving // to drain state so that the trailing metadata callback // has a chance to produce shutdown checkpoint. + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err) + } ca.MoveToDraining(err) break } @@ -708,6 +729,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Shut down the poller if it wasn't already. ca.cancel() + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err) + } ca.MoveToDraining(err) break } @@ -1233,6 +1257,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope] sli, err := cf.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err) + } cf.MoveToDraining(err) return } @@ -1240,9 +1267,11 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle, cf.spec.User(), cf.spec.JobID, sli) - if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err) + } cf.MoveToDraining(err) return } @@ -1255,6 +1284,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.evalCtx.ChangefeedState == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state") + } cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state")) return } @@ -1265,6 +1297,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err) + } cf.MoveToDraining(err) return } @@ -1286,6 +1321,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.frontier.initialHighWater = *ts for _, span := range cf.spec.TrackedSpans { if _, err := cf.frontier.Forward(span, *ts); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err) + } cf.MoveToDraining(err) return } @@ -1459,6 +1497,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // TODO(ajwerner): make this more useful by at least informing the client // of which tables changed. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err) + } cf.MoveToDraining(err) break } @@ -1466,6 +1507,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad row, meta := cf.input.Next() if meta != nil { if meta.Err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err) + } cf.MoveToDraining(nil /* err */) } if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil { @@ -1473,11 +1517,17 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // that the aggregator exited due to node shutdown. Transition to // draining so that the remaining aggregators will shut down and // transmit their up-to-date frontier. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed) + } cf.MoveToDraining(changefeedbase.ErrNodeDraining) } return nil, meta } if row == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator") + } cf.MoveToDraining(nil /* err */) break } @@ -1492,6 +1542,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } if err := cf.noteAggregatorProgress(row[0]); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err) + } cf.MoveToDraining(err) break } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 6fdd93f3a81c..29368fb7ab0c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1581,6 +1581,8 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1685,6 +1687,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1719,6 +1723,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1766,6 +1772,8 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1813,6 +1821,8 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1847,6 +1857,8 @@ func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1940,6 +1952,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes >1 min under race") + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2542,6 +2556,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2736,6 +2752,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2893,6 +2911,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB) @@ -2955,6 +2975,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`) @@ -2985,6 +3007,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -3103,6 +3127,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + // requireErrorSoon times out after 30 seconds skip.UnderStress(t) skip.UnderRace(t) @@ -3147,6 +3173,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -8597,6 +8625,8 @@ func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 5a5a8483d4ea..079777d8e74e 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -318,6 +318,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) { log.Infof(ctx, "kv feed run starting") emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error { + if log.V(2) { + log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans) + } for _, sp := range f.spans { if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil { return err @@ -412,10 +415,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) { // Exit if the policy says we should. if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT { + log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType) return schemaChangeDetectedError{ts: schemaChangeTS} } - log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS) + log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType) } }