From d6e7b273727972fae773df151aa11d8197e99efe Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 23 Aug 2023 10:37:43 -0400 Subject: [PATCH] changefeedccl: move node drain handling logic out of kvfeed Previously, the kvfeed was responsible for monitoring for node drains using a goroutine. This change moves this logic into the change aggregator and removes the goroutine. Overall, this change makes the code more organized and performant. This change was inspired by work being done for #109167. The work in that PR requires being able to restart the kvfeed. Having drain logic intermingled with the kvfeed makes restarts much more complex, hard to review, prone to bugs, etc. Informs: https://github.com/cockroachdb/cockroach/issues/96953 Release note: None Epic: None --- .../changefeedccl/changefeed_processors.go | 81 +++++++++++-------- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 7 -- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 102a7f855a94..6852cf9f8e52 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -51,7 +51,8 @@ type changeAggregator struct { spec execinfrapb.ChangeAggregatorSpec memAcc mon.BoundAccount - // cancel shuts down the processor, both the `Next()` flow and the kvfeed. + // cancel cancels the context passed to all resources created while starting + // this aggregator. cancel func() // errCh contains the return values of the kvfeed. errCh chan error @@ -59,9 +60,12 @@ type changeAggregator struct { kvFeedDoneCh chan struct{} kvFeedMemMon *mon.BytesMonitor - // drainWatchCh is signaled if the registry on this node is being drained. + // drainWatchCh is signaled if the job registry on this node is being + // drained, which is a proxy for the node being drained. If a drain occurs, + // it will be blocked until we allow it to proceed by calling drainDone(). + // This gives the aggregator time to checkpoint before shutting down. drainWatchCh <-chan struct{} - drainDone func() // Cleanup function for drain watch. + drainDone func() // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -287,7 +291,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -312,7 +315,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed) if err != nil { - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -321,9 +323,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventConsumer, ca.sink, err = newEventConsumer( ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) - if err != nil { - // Early abort in the case that there is an error setting up the consumption. ca.MoveToDraining(err) ca.cancel() return @@ -334,6 +334,26 @@ func (ca *changeAggregator) Start(ctx context.Context) { // Generate expensive checkpoint only after we ran for a while. ca.lastSpanFlush = timeutil.Now() + + if ca.knobs.OnDrain != nil { + ca.drainWatchCh = ca.knobs.OnDrain() + } else { + ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() + } +} + +// checkForNodeDrain returns an error if the node is draining. +func (ca *changeAggregator) checkForNodeDrain() error { + if ca.drainWatchCh == nil { + return errors.AssertionFailedf("cannot check for node drain if" + + " watch channel is nil") + } + select { + case <-ca.drainWatchCh: + return changefeedbase.ErrNodeDraining + default: + return nil + } } func (ca *changeAggregator) startKVFeed( @@ -355,21 +375,6 @@ func (ca *changeAggregator) startKVFeed( return nil, err } - ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() - // Arrange for kvFeed to terminate if the job registry is being drained. - kvfeedCfg.FeedWatcher = func(ctx context.Context) error { - if ca.knobs.OnDrain != nil { - ca.drainWatchCh = ca.knobs.OnDrain() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ca.drainWatchCh: - return changefeedbase.ErrNodeDraining - } - } - // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) @@ -575,21 +580,27 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } } + // As the last gasp before shutdown, transmit an up-to-date frontier + // information to the coordinator. We expect to get this signal via the + // polling below before the drain actually occurs and starts tearing + // things down. + if err := ca.checkForNodeDrain(); err != nil { + nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID() + meta := &execinfrapb.ChangefeedMeta{ + DrainInfo: &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}, + Checkpoint: getFrontierSpans(), + } + + ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: meta}) + ca.shutdownCheckpointEmitted = true + ca.cancel() + ca.MoveToDraining(err) + break + } + if err := ca.tick(); err != nil { var e kvevent.ErrBufferClosed - if errors.Is(err, changefeedbase.ErrNodeDraining) { - err = changefeedbase.ErrNodeDraining - // As the last gasp before shutdown, transmit an up-to-date frontier - // information to the coordinator. - nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID() - meta := &execinfrapb.ChangefeedMeta{ - DrainInfo: &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}, - Checkpoint: getFrontierSpans(), - } - - ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: meta}) - ca.shutdownCheckpointEmitted = true - } else if errors.As(err, &e) { + if errors.As(err, &e) { // ErrBufferClosed is a signal that our kvfeed has exited expectedly. err = e.Unwrap() if errors.Is(err, kvevent.ErrNormalRestartReason) { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 18f99e47567b..5147e9d8c92b 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -53,10 +53,6 @@ type Config struct { SchemaChangePolicy changefeedbase.SchemaChangePolicy SchemaFeed schemafeed.SchemaFeed - // FeedWatcher function is invoked along with the kv/schema feed. - // It may return an error which will cause kv feed to exit. - FeedWatcher func(ctx context.Context) error - // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the // InitialHighWater is a point in time at which all data is known to have @@ -115,9 +111,6 @@ func Run(ctx context.Context, cfg Config) error { g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) - if cfg.FeedWatcher != nil { - g.GoCtx(cfg.FeedWatcher) - } err := g.Wait() // NB: The higher layers of the changefeed should detect the boundary and the