Skip to content

Commit

Permalink
changefeedccl: move node drain handling logic out of kvfeed
Browse files Browse the repository at this point in the history
Previously, the kvfeed was responsible for monitoring for
node drains using a goroutine. This change moves this logic
into the change aggregator and removes the goroutine.
Overall, this change makes the code more organized and performant.

This change was inspired by work being done for cockroachdb#109167. The
work in that PR requires being able to restart the kvfeed.
Having drain logic intermingled with the kvfeed makes
restarts much more complex, hard to review, prone to bugs, etc.

Informs: cockroachdb#96953
Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Aug 23, 2023
1 parent 9ad8453 commit a23ceea
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 29 deletions.
53 changes: 31 additions & 22 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,21 @@ type changeAggregator struct {
spec execinfrapb.ChangeAggregatorSpec
memAcc mon.BoundAccount

// cancel shuts down the processor, both the `Next()` flow and the kvfeed.
// cancel cancels the context passed to all resources created while starting
// this aggregator.
cancel func()
// errCh contains the return values of the kvfeed.
errCh chan error
// kvFeedDoneCh is closed when the kvfeed exits.
kvFeedDoneCh chan struct{}
kvFeedMemMon *mon.BytesMonitor

// drainWatchCh is signaled if the registry on this node is being drained.
// drainWatchCh is signaled if the job registry on this node is being
// drained, which is a proxy for the node being drained. If a drain occurs,
// it will be blocked until we allow it to proceed by calling drainDone().
// This gives the aggregator time to checkpoint before shutting down.
drainWatchCh <-chan struct{}
drainDone func() // Cleanup function for drain watch.
drainDone func()

// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
Expand Down Expand Up @@ -287,7 +291,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, recorder)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -312,7 +315,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {

ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -321,9 +323,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.eventConsumer, ca.sink, err = newEventConsumer(
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)

if err != nil {
// Early abort in the case that there is an error setting up the consumption.
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -334,6 +334,26 @@ func (ca *changeAggregator) Start(ctx context.Context) {

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()

if ca.knobs.OnDrain != nil {
ca.drainWatchCh = ca.knobs.OnDrain()
} else {
ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
}
}

// checkForNodeDrain returns an error if the node is draining.
func (ca *changeAggregator) checkForNodeDrain() error {
if ca.drainWatchCh == nil {
return errors.AssertionFailedf("cannot check for node drain if" +
" watch channel is nil")
}
select {
case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
default:
return nil
}
}

func (ca *changeAggregator) startKVFeed(
Expand All @@ -355,21 +375,6 @@ func (ca *changeAggregator) startKVFeed(
return nil, err
}

ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
// Arrange for kvFeed to terminate if the job registry is being drained.
kvfeedCfg.FeedWatcher = func(ctx context.Context) error {
if ca.knobs.OnDrain != nil {
ca.drainWatchCh = ca.knobs.OnDrain()
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
}
}

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
ca.errCh = make(chan error, 2)
Expand Down Expand Up @@ -631,6 +636,10 @@ func (ca *changeAggregator) tick() error {
return err
}

if err := ca.checkForNodeDrain(); err != nil {
return err
}

queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds()
ca.metrics.QueueTimeNanos.Inc(queuedNanos)

Expand Down
11 changes: 4 additions & 7 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ type Config struct {
SchemaChangePolicy changefeedbase.SchemaChangePolicy
SchemaFeed schemafeed.SchemaFeed

// FeedWatcher function is invoked along with the kv/schema feed.
// It may return an error which will cause kv feed to exit.
FeedWatcher func(ctx context.Context) error

// If true, the feed will begin with a dump of data at exactly the
// InitialHighWater. This is a peculiar behavior. In general the
// InitialHighWater is a point in time at which all data is known to have
Expand Down Expand Up @@ -115,10 +111,8 @@ func Run(ctx context.Context, cfg Config) error {
g := ctxgroup.WithContext(ctx)
g.GoCtx(cfg.SchemaFeed.Run)
g.GoCtx(f.run)
if cfg.FeedWatcher != nil {
g.GoCtx(cfg.FeedWatcher)
}
err := g.Wait()
log.Errorf(ctx, "kvfeed OVERALL err %v\n", err)

// NB: The higher layers of the changefeed should detect the boundary and the
// policy and tear everything down. Returning before the higher layers tear down
Expand Down Expand Up @@ -242,6 +236,9 @@ func newKVFeed(
var errChangefeedCompleted = errors.New("changefeed completed")

func (f *kvFeed) run(ctx context.Context) (err error) {
defer func() {
log.Errorf(ctx, "kvfeed OVERALL run %v\n", err)
}()
emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error {
for _, sp := range f.spans {
if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {
Expand Down

0 comments on commit a23ceea

Please sign in to comment.