diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index c9ca21704743..90648b421663 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -296,16 +296,19 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx = ca.StartInternal(ctx, changeAggregatorProcName) spans, err := ca.setupSpansAndFrontier() - - feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) - - opts := feed.Opts - if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err) + } ca.MoveToDraining(err) ca.cancel() return } + + feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed) + + opts := feed.Opts + timestampOracle := &changeAggregatorLowerBoundOracle{ sf: ca.frontier.SpanFrontier(), initialInclusiveLowerBound: feed.ScanTime, @@ -322,6 +325,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { scope, _ := opts.GetMetricScope() ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -333,6 +339,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { if !ca.isSinkless() { recorder, err = ca.wrapMetricsController(ctx, recorder) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -343,6 +352,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -374,6 +386,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV) ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -383,6 +398,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err) + } ca.MoveToDraining(err) ca.cancel() return @@ -694,6 +712,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // NB: we do not invoke ca.cancel here -- just merely moving // to drain state so that the trailing metadata callback // has a chance to produce shutdown checkpoint. + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err) + } ca.MoveToDraining(err) break } @@ -715,6 +736,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Shut down the poller if it wasn't already. ca.cancel() + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err) + } ca.MoveToDraining(err) break } @@ -1234,15 +1258,20 @@ func (cf *changeFrontier) Start(ctx context.Context) { scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope] sli, err := cf.metrics.getSLIMetrics(scope) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err) + } cf.MoveToDraining(err) return } cf.sliMetrics = sli cf.sink, err = getResolvedTimestampSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle, cf.spec.User(), cf.spec.JobID, sli) - if err != nil { err = changefeedbase.MarkRetryableError(err) + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err) + } cf.MoveToDraining(err) return } @@ -1255,6 +1284,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.EvalCtx.ChangefeedState == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state") + } cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state")) return } @@ -1265,6 +1297,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err) + } cf.MoveToDraining(err) return } @@ -1286,6 +1321,9 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.frontier.initialHighWater = *ts for _, span := range cf.spec.TrackedSpans { if _, err := cf.frontier.Forward(span, *ts); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err) + } cf.MoveToDraining(err) return } @@ -1459,6 +1497,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // TODO(ajwerner): make this more useful by at least informing the client // of which tables changed. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err) + } cf.MoveToDraining(err) break } @@ -1466,6 +1507,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad row, meta := cf.input.Next() if meta != nil { if meta.Err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err) + } cf.MoveToDraining(nil /* err */) } if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil { @@ -1473,11 +1517,17 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // that the aggregator exited due to node shutdown. Transition to // draining so that the remaining aggregators will shut down and // transmit their up-to-date frontier. + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed) + } cf.MoveToDraining(changefeedbase.ErrNodeDraining) } return nil, meta } if row == nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator") + } cf.MoveToDraining(nil /* err */) break } @@ -1492,6 +1542,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } if err := cf.noteAggregatorProgress(row[0]); err != nil { + if log.V(2) { + log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err) + } cf.MoveToDraining(err) break } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 52c52ad1b0a7..0eaff9bff74c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1536,6 +1536,10 @@ func TestChangefeedUserDefinedTypes(t *testing.T) { // targeted by the changefeed, it should not stop. func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1634,6 +1638,10 @@ func TestChangefeedCanResumeWhenClusterIDMissing(t *testing.T) { // If we drop columns which are not targeted by the changefeed, it should not backfill. func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1668,6 +1676,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1713,6 +1723,10 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1758,6 +1772,10 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1790,6 +1808,10 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -1883,6 +1905,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes >1 min under race") + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2485,6 +2509,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2679,6 +2705,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -2836,6 +2864,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB) @@ -2898,6 +2928,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`) @@ -2928,6 +2960,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -3046,6 +3080,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + // requireErrorSoon times out after 30 seconds skip.UnderStress(t) skip.UnderRace(t) @@ -3090,6 +3126,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) @@ -8532,6 +8570,10 @@ func TestChangefeedTestTimesOut(t *testing.T) { // Regression for #85008. func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 585d6db76292..205442e4f7c0 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -330,6 +330,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) { log.Infof(ctx, "kv feed run starting") emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error { + if log.V(2) { + log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans) + } for _, sp := range f.spans { if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil { return err @@ -422,10 +425,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) { // Exit if the policy says we should. if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT { + log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType) return schemaChangeDetectedError{ts: schemaChangeTS} } - log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS) + log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType) } }