Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
131732: ci: update bazel builder image r=rickystewart a=cockroach-teamcity

Release note: None
Epic: None


131780: changefeedccl: enable verbose logs for a few schema change unit tests r=rharding6373 a=andyyang890

**changefeedccl/kvfeed: add debug logs around schema change boundary types**

Release note: None

----

**changefeedccl: add debug logs around when processors move to draining**

Release note: None

----

**changefeedccl: enable verbose logs for a few schema change unit tests**

We're seeing that some changefeed unit tests involving schema changes
are sometimes flaking when using a core changefeed. This patch enables
verbose logs that will provide more information about what the kv feed
is doing when it sees the schema change and when processors are moving
to draining.

Release note: None

----

Informs #131186
Informs #130404
Informs #129777
Informs #129226
Informs #129045

Co-authored-by: cockroach-teamcity <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
3 people committed Oct 2, 2024
3 parents 624e6d8 + 22f68a1 + 3c35540 commit 91d961f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build/.bazelbuilderversion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20240605-060303
us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel:20241002-062031
4 changes: 2 additions & 2 deletions build/toolchains/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ platform(
"@platforms//cpu:x86_64",
],
exec_properties = {
"container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:1244a0ff14aed92470cfc02c20a9483171da5872b424d94844cbebf9a1e7364a",
"container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:d0c6255981566ee171634bfb2ef28fc9357bf10247720f6c766567f1c4066d13",
"dockerReuse": "True",
"Pool": "default",
},
Expand Down Expand Up @@ -137,7 +137,7 @@ platform(
"@platforms//cpu:arm64",
],
exec_properties = {
"container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:03e760315b966b32c5ca195927fe4a2a1c262f448089a5728b740696e7f881eb",
"container-image": "docker://us-east1-docker.pkg.dev/crl-ci-images/cockroach/bazel@sha256:e967099c1c80c2bfe39e06d31ef0d6983b09fff8849887747c4e7096fc980128",
"dockerReuse": "True",
"Pool": "default",
},
Expand Down
65 changes: 59 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

spans, err := ca.setupSpansAndFrontier()

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

opts := feed.Opts

timestampOracle := &changeAggregatorLowerBoundOracle{
sf: ca.frontier,
initialInclusiveLowerBound: feed.ScanTime,
Expand All @@ -318,6 +321,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
scope, _ := opts.GetMetricScope()
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -329,6 +335,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -339,6 +348,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, recorder)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -370,6 +382,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -379,6 +394,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -687,6 +705,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// NB: we do not invoke ca.cancel here -- just merely moving
// to drain state so that the trailing metadata callback
// has a chance to produce shutdown checkpoint.
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand All @@ -708,6 +729,9 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
}
// Shut down the poller if it wasn't already.
ca.cancel()
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
}
ca.MoveToDraining(err)
break
}
Expand Down Expand Up @@ -1233,16 +1257,21 @@ func (cf *changeFrontier) Start(ctx context.Context) {
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
}
cf.MoveToDraining(err)
return
}
cf.sliMetrics = sli

cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1255,6 +1284,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.evalCtx.ChangefeedState == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
}
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
return
}
Expand All @@ -1265,6 +1297,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
if cf.spec.JobID != 0 {
job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand All @@ -1286,6 +1321,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.frontier.initialHighWater = *ts
for _, span := range cf.spec.TrackedSpans {
if _, err := cf.frontier.Forward(span, *ts); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error forwarding span during highwater restoration: %v", err)
}
cf.MoveToDraining(err)
return
}
Expand Down Expand Up @@ -1459,25 +1497,37 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// TODO(ajwerner): make this more useful by at least informing the client
// of which tables changed.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after schema change: %v", err)
}
cf.MoveToDraining(err)
break
}

row, meta := cf.input.Next()
if meta != nil {
if meta.Err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
}
cf.MoveToDraining(nil /* err */)
}
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
// Seeing changefeed drain info metadata from the aggregator means
// that the aggregator exited due to node shutdown. Transition to
// draining so that the remaining aggregators will shut down and
// transmit their up-to-date frontier.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
}
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
}
return nil, meta
}
if row == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
}
cf.MoveToDraining(nil /* err */)
break
}
Expand All @@ -1492,6 +1542,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}

if err := cf.noteAggregatorProgress(row[0]); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
}
cf.MoveToDraining(err)
break
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,8 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1685,6 +1687,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1719,6 +1723,8 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1766,6 +1772,8 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1813,6 +1821,8 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1847,6 +1857,8 @@ func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -1940,6 +1952,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {
defer log.Scope(t).Close(t)
skip.UnderRace(t, "takes >1 min under race")

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2542,6 +2556,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2736,6 +2752,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -2893,6 +2911,8 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB)
Expand Down Expand Up @@ -2955,6 +2975,8 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`)
Expand Down Expand Up @@ -2985,6 +3007,8 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

sqlDB := sqlutils.MakeSQLRunner(s.DB)
Expand Down Expand Up @@ -3103,6 +3127,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

// requireErrorSoon times out after 30 seconds
skip.UnderStress(t)
skip.UnderRace(t)
Expand Down Expand Up @@ -3147,6 +3173,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down Expand Up @@ -8597,6 +8625,8 @@ func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
log.Infof(ctx, "kv feed run starting")

emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error {
if log.V(2) {
log.Infof(ctx, "emitting resolved spans at time %s with boundary %s for spans: %s", ts, boundary, f.spans)
}
for _, sp := range f.spans {
if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {
return err
Expand Down Expand Up @@ -412,10 +415,11 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

// Exit if the policy says we should.
if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT {
log.Infof(ctx, "kv feed run loop exiting due to schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
return schemaChangeDetectedError{ts: schemaChangeTS}
}

log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS)
log.Infof(ctx, "kv feed run loop restarting because of schema change at %s and boundary type %s", schemaChangeTS, boundaryType)
}
}

Expand Down

0 comments on commit 91d961f

Please sign in to comment.