From a23ceea2eaa1d26fe0cd9401c0f5701618842802 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 23 Aug 2023 10:37:43 -0400 Subject: [PATCH] changefeedccl: move node drain handling logic out of kvfeed Previously, the kvfeed was responsible for monitoring for node drains using a goroutine. This change moves this logic into the change aggregator and removes the goroutine. Overall, this change makes the code more organized and performant. This change was inspired by work being done for #109167. The work in that PR requires being able to restart the kvfeed. Having drain logic intermingled with the kvfeed makes restarts much more complex, hard to review, prone to bugs, etc. Informs: https://github.com/cockroachdb/cockroach/issues/96953 Release note: None Epic: None --- .../changefeedccl/changefeed_processors.go | 53 +++++++++++-------- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 11 ++-- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 102a7f855a94..7958d351032c 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -51,7 +51,8 @@ type changeAggregator struct { spec execinfrapb.ChangeAggregatorSpec memAcc mon.BoundAccount - // cancel shuts down the processor, both the `Next()` flow and the kvfeed. + // cancel cancels the context passed to all resources created while starting + // this aggregator. cancel func() // errCh contains the return values of the kvfeed. errCh chan error @@ -59,9 +60,12 @@ type changeAggregator struct { kvFeedDoneCh chan struct{} kvFeedMemMon *mon.BytesMonitor - // drainWatchCh is signaled if the registry on this node is being drained. + // drainWatchCh is signaled if the job registry on this node is being + // drained, which is a proxy for the node being drained. If a drain occurs, + // it will be blocked until we allow it to proceed by calling drainDone(). + // This gives the aggregator time to checkpoint before shutting down. drainWatchCh <-chan struct{} - drainDone func() // Cleanup function for drain watch. + drainDone func() // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -287,7 +291,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -312,7 +315,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed) if err != nil { - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -321,9 +323,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventConsumer, ca.sink, err = newEventConsumer( ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) - if err != nil { - // Early abort in the case that there is an error setting up the consumption. ca.MoveToDraining(err) ca.cancel() return @@ -334,6 +334,26 @@ func (ca *changeAggregator) Start(ctx context.Context) { // Generate expensive checkpoint only after we ran for a while. ca.lastSpanFlush = timeutil.Now() + + if ca.knobs.OnDrain != nil { + ca.drainWatchCh = ca.knobs.OnDrain() + } else { + ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() + } +} + +// checkForNodeDrain returns an error if the node is draining. +func (ca *changeAggregator) checkForNodeDrain() error { + if ca.drainWatchCh == nil { + return errors.AssertionFailedf("cannot check for node drain if" + + " watch channel is nil") + } + select { + case <-ca.drainWatchCh: + return changefeedbase.ErrNodeDraining + default: + return nil + } } func (ca *changeAggregator) startKVFeed( @@ -355,21 +375,6 @@ func (ca *changeAggregator) startKVFeed( return nil, err } - ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() - // Arrange for kvFeed to terminate if the job registry is being drained. - kvfeedCfg.FeedWatcher = func(ctx context.Context) error { - if ca.knobs.OnDrain != nil { - ca.drainWatchCh = ca.knobs.OnDrain() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ca.drainWatchCh: - return changefeedbase.ErrNodeDraining - } - } - // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) @@ -631,6 +636,10 @@ func (ca *changeAggregator) tick() error { return err } + if err := ca.checkForNodeDrain(); err != nil { + return err + } + queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds() ca.metrics.QueueTimeNanos.Inc(queuedNanos) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 18f99e47567b..87f7a75a0dcf 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -53,10 +53,6 @@ type Config struct { SchemaChangePolicy changefeedbase.SchemaChangePolicy SchemaFeed schemafeed.SchemaFeed - // FeedWatcher function is invoked along with the kv/schema feed. - // It may return an error which will cause kv feed to exit. - FeedWatcher func(ctx context.Context) error - // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the // InitialHighWater is a point in time at which all data is known to have @@ -115,10 +111,8 @@ func Run(ctx context.Context, cfg Config) error { g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) - if cfg.FeedWatcher != nil { - g.GoCtx(cfg.FeedWatcher) - } err := g.Wait() + log.Errorf(ctx, "kvfeed OVERALL err %v\n", err) // NB: The higher layers of the changefeed should detect the boundary and the // policy and tear everything down. Returning before the higher layers tear down @@ -242,6 +236,9 @@ func newKVFeed( var errChangefeedCompleted = errors.New("changefeed completed") func (f *kvFeed) run(ctx context.Context) (err error) { + defer func() { + log.Errorf(ctx, "kvfeed OVERALL run %v\n", err) + }() emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error { for _, sp := range f.spans { if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {