Skip to content

Commit

Permalink
Merge pull request #131796 from andyyang890/backport23.2-131780
Browse files Browse the repository at this point in the history
release-23.2: changefeedccl: enable verbose logs for a few schema change unit tests
  • Loading branch information
andyyang890 authored Oct 3, 2024
2 parents 822e2ce + 80bfae5 commit e77551b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 7 deletions.
65 changes: 59 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

spans, err := ca.setupSpansAndFrontier()

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

timestampOracle := &changeAggregatorLowerBoundOracle{
sf: ca.frontier.SpanFrontier(),
initialInclusiveLowerBound: feed.ScanTime,
Expand All @@ -322,6 +325,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
scope, _ := opts.GetMetricScope()
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -333,6 +339,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -343,6 +352,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, recorder)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -374,6 +386,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -383,6 +398,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -694,6 +712,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// NB: we do not invoke ca.cancel here -- just merely moving
// to drain state so that the trailing metadata callback
// has a chance to produce shutdown checkpoint.
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand All @@ -715,6 +736,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
}
// Shut down the poller if it wasn't already.
ca.cancel()
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand Down Expand Up @@ -1234,15 +1258,20 @@ func (cf *changeFrontier) Start(ctx context.Context) {
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
}
cf.MoveToDraining(err)
return
}
cf.sliMetrics = sli
cf.sink, err = getResolvedTimestampSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1255,6 +1284,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.EvalCtx.ChangefeedState == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
}
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
return
}
Expand All @@ -1265,6 +1297,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1286,6 +1321,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.frontier.initialHighWater = *ts
for _, span := range cf.spec.TrackedSpans {
if _, err := cf.frontier.Forward(span, *ts); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand Down Expand Up @@ -1459,25 +1497,37 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// TODO(ajwerner): make this more useful by at least informing the client
// of which tables changed.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err)
}
cf.MoveToDraining(err)
break
}

row, meta := cf.input.Next()
if meta != nil {
if meta.Err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
}
cf.MoveToDraining(nil /* err */)
}
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
// Seeing changefeed drain info metadata from the aggregator means
// that the aggregator exited due to node shutdown. Transition to
// draining so that the remaining aggregators will shut down and
// transmit their up-to-date frontier.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
}
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
}
return nil, meta
}
if row == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
}
cf.MoveToDraining(nil /* err */)
break
}
Expand All @@ -1492,6 +1542,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}

if err := cf.noteAggregatorProgress(row[0]); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
}
cf.MoveToDraining(err)
break
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,10 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
// targeted by the changefeed, it should not stop.
func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1634,6 +1638,10 @@ func TestChangefeedCanResumeWhenClusterIDMissing(t *testing.T) {
// If we drop columns which are not targeted by the changefeed, it should not backfill.
func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1668,6 +1676,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1713,6 +1723,10 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1758,6 +1772,10 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1790,6 +1808,10 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {

func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1883,6 +1905,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {
defer log.Scope(t).Close(t)
skip.UnderRace(t, "takes >1 min under race")

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2485,6 +2509,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2679,6 +2705,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2836,6 +2864,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB)
Expand Down Expand Up @@ -2898,6 +2928,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`)
Expand Down Expand Up @@ -2928,6 +2960,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

sqlDB := sqlutils.MakeSQLRunner(s.DB)
Expand Down Expand Up @@ -3046,6 +3080,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

// requireErrorSoon times out after 30 seconds
skip.UnderStress(t)
skip.UnderRace(t)
Expand Down Expand Up @@ -3090,6 +3126,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -8532,6 +8570,10 @@ func TestChangefeedTestTimesOut(t *testing.T) {
// Regression for #85008.
func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
log.Infof(ctx, "kv feed run starting")

emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error {
if log.V(2) {
log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans)
}
for _, sp := range f.spans {
if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {
return err
Expand Down Expand Up @@ -422,10 +425,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

// Exit if the policy says we should.
if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT {
log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
return schemaChangeDetectedError{ts: schemaChangeTS}
}

log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS)
log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
}
}

Expand Down

0 comments on commit e77551b

Please sign in to comment.