Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: changefeedccl: enable verbose logs for a few schema change unit tests #131795

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

spans, err := ca.setupSpansAndFrontier()

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

timestampOracle := &changeAggregatorLowerBoundOracle{
sf: ca.frontier,
initialInclusiveLowerBound: feed.ScanTime,
Expand All @@ -322,6 +325,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
scope, _ := opts.GetMetricScope()
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -333,6 +339,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -343,6 +352,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, recorder)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -374,6 +386,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -383,6 +398,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -694,6 +712,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// NB: we do not invoke ca.cancel here -- just merely moving
// to drain state so that the trailing metadata callback
// has a chance to produce shutdown checkpoint.
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand All @@ -715,6 +736,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
}
// Shut down the poller if it wasn't already.
ca.cancel()
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand Down Expand Up @@ -1237,15 +1261,20 @@ func (cf *changeFrontier) Start(ctx context.Context) {
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
}
cf.MoveToDraining(err)
return
}
cf.sliMetrics = sli
cf.sink, err = getResolvedTimestampSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1258,6 +1287,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.EvalCtx.ChangefeedState == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
}
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
return
}
Expand All @@ -1268,6 +1300,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1289,6 +1324,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.frontier.initialHighWater = *ts
for _, span := range cf.spec.TrackedSpans {
if _, err := cf.frontier.Forward(span, *ts); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand Down Expand Up @@ -1462,25 +1500,37 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// TODO(ajwerner): make this more useful by at least informing the client
// of which tables changed.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err)
}
cf.MoveToDraining(err)
break
}

row, meta := cf.input.Next()
if meta != nil {
if meta.Err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
}
cf.MoveToDraining(nil /* err */)
}
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
// Seeing changefeed drain info metadata from the aggregator means
// that the aggregator exited due to node shutdown. Transition to
// draining so that the remaining aggregators will shut down and
// transmit their up-to-date frontier.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
}
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
}
return nil, meta
}
if row == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
}
cf.MoveToDraining(nil /* err */)
break
}
Expand All @@ -1495,6 +1545,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}

if err := cf.noteAggregatorProgress(row[0]); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
}
cf.MoveToDraining(err)
break
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,10 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
// targeted by the changefeed, it should not stop.
func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1650,6 +1654,10 @@ func TestChangefeedCanResumeWhenClusterIDMissing(t *testing.T) {
// If we drop columns which are not targeted by the changefeed, it should not backfill.
func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1684,6 +1692,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1729,6 +1739,10 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1774,6 +1788,10 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1806,6 +1824,10 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {

func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1899,6 +1921,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {
defer log.Scope(t).Close(t)
skip.UnderRace(t, "takes >1 min under race")

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2501,6 +2525,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2695,6 +2721,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2852,6 +2880,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB)
Expand Down Expand Up @@ -2914,6 +2944,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`)
Expand Down Expand Up @@ -2944,6 +2976,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

sqlDB := sqlutils.MakeSQLRunner(s.DB)
Expand Down Expand Up @@ -3062,6 +3096,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

// requireErrorSoon times out after 30 seconds
skip.UnderStress(t)
skip.UnderRace(t)
Expand Down Expand Up @@ -3106,6 +3142,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -8555,6 +8593,10 @@ func TestChangefeedTestTimesOut(t *testing.T) {
// Regression for #85008.
func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
log.Infof(ctx, "kv feed run starting")

emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error {
if log.V(2) {
log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans)
}
for _, sp := range f.spans {
if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {
return err
Expand Down Expand Up @@ -415,10 +418,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

// Exit if the policy says we should.
if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT {
log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
return schemaChangeDetectedError{ts: schemaChangeTS}
}

log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS)
log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
}
}

Expand Down