diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index a6cb15b8888f..664720c9b695 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -59,6 +59,10 @@ type changeAggregator struct { kvFeedDoneCh chan struct{} kvFeedMemMon *mon.BytesMonitor + // drainWatchCh is signaled if the registry on this node is being drained. + drainWatchCh <-chan struct{} + drainDone func() // Cleanup function for drain watch. + // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. sink EventSink @@ -351,17 +355,17 @@ func (ca *changeAggregator) startKVFeed( return nil, err } + ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() // Arrange for kvFeed to terminate if the job registry is being drained. kvfeedCfg.FeedWatcher = func(ctx context.Context) error { - drainCh := ca.flowCtx.Cfg.JobRegistry.OnDrain() if ca.knobs.OnDrain != nil { - drainCh = ca.knobs.OnDrain() + ca.drainWatchCh = ca.knobs.OnDrain() } select { case <-ctx.Done(): return ctx.Err() - case <-drainCh: + case <-ca.drainWatchCh: return changefeedbase.ErrNodeDraining } } @@ -503,6 +507,9 @@ func (ca *changeAggregator) close() { if ca.kvFeedDoneCh != nil { <-ca.kvFeedDoneCh } + if ca.drainDone != nil { + ca.drainDone() + } if ca.eventConsumer != nil { _ = ca.eventConsumer.Close() // context cancellation expected here. } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 5d6ec8fff403..a94f387d5a1e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1276,12 +1276,22 @@ func (b *changefeedResumer) resumeWithRetries( return jobs.MarkAsRetryJobError(err) } - if errors.Is(flowErr, changefeedbase.ErrNodeDraining) { + isRegistryDraining := func() bool { + drainCh, cleanup := execCfg.JobRegistry.OnDrain() + defer cleanup() select { - case <-execCfg.JobRegistry.OnDrain(): + case <-drainCh: + return true + default: + return false + } + } + + if errors.Is(flowErr, changefeedbase.ErrNodeDraining) { + if isRegistryDraining() { // If this node is draining, there is no point in retrying. return jobs.MarkAsRetryJobError(changefeedbase.ErrNodeDraining) - default: + } else { // We know that some node (other than this one) is draining. // When we retry, the planner ought to take into account // this information. However, there is a bit of a race here diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 3bd89bedeae5..51239ef82d1c 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -147,6 +147,10 @@ type Registry struct { // process starts. draining bool + // numDrainWait is the number of jobs that are still + // processing drain request. + numDrainWait int + // ingestingJobs is a map of jobs which are actively ingesting on this node // including via a processor. ingestingJobs map[jobspb.JobID]struct{} @@ -158,6 +162,10 @@ type Registry struct { // in an orderly fashion, prior to resumer context being canceled. // The registry will no longer adopt new jobs once this channel closed. drainRequested chan struct{} + // jobDrained signaled to indicate that the job watching drainRequested channel + // completed its drain logic. + jobDrained chan struct{} + // drainJobs closed when registry should drain/cancel all active // jobs and should no longer adopt new jobs. drainJobs chan struct{} @@ -238,6 +246,7 @@ func MakeRegistry( withSessionEvery: log.Every(time.Second), drainJobs: make(chan struct{}), drainRequested: make(chan struct{}), + jobDrained: make(chan struct{}, 1), } if knobs != nil { r.knobs = *knobs @@ -1997,24 +2006,60 @@ func (r *Registry) WaitForRegistryShutdown(ctx context.Context) { // by requesting all currently running jobs, as well as various job registry // processes terminate. // WaitForRegistryShutdown can then be used to wait for those tasks to complete. -func (r *Registry) DrainRequested() func() { +func (r *Registry) DrainRequested(ctx context.Context, maxWait time.Duration) { r.mu.Lock() alreadyDraining := r.mu.draining + numWait := r.mu.numDrainWait r.mu.draining = true r.mu.Unlock() if alreadyDraining { - return func() {} + return } close(r.drainRequested) - return func() { close(r.drainJobs) } + defer close(r.drainRequested) + + if numWait == 0 { + return + } + + t := timeutil.NewTimer() + defer t.Stop() + t.Reset(maxWait) + + for numWait > 0 { + select { + case <-ctx.Done(): + return + case <-t.C: + return + case <-r.jobDrained: + r.mu.Lock() + numWait = r.mu.numDrainWait + r.mu.Unlock() + } + } } // OnDrain returns a channel that can be selected on to detect when the job // registry begins draining. -func (r *Registry) OnDrain() <-chan struct{} { - return r.drainRequested +// The caller must invoke returned function when drain completes. +func (r *Registry) OnDrain() (<-chan struct{}, func()) { + r.mu.Lock() + r.mu.numDrainWait++ + r.mu.Unlock() + + return r.drainRequested, func() { + r.mu.Lock() + r.mu.numDrainWait-- + r.mu.Unlock() + + select { + case r.jobDrained <- struct{}{}: + default: + } + } } // TestingIsJobIdle returns true if the job is adopted and currently idle. diff --git a/pkg/server/drain.go b/pkg/server/drain.go index 0b1a6c9ebffe..6f5ce4cdc2d6 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -401,12 +401,7 @@ func (s *drainServer) drainClients( // issues a BACKUP or some other job-based statement before it // disconnects, and encounters a job error as a result -- that the // registry is now unavailable due to the drain. - drainJobRegistry := s.sqlServer.jobRegistry.DrainRequested() - if delay := jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV); delay > 0 && shouldDelayDraining { - log.Ops.Infof(ctx, "waiting for %s for running jobs to notice that the node is draining", delay) - s.drainSleepFn(delay) - } - drainJobRegistry() + s.sqlServer.jobRegistry.DrainRequested(ctx, jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV)) // Inform the auto-stats tasks that the node is draining. s.sqlServer.statsRefresher.SetDraining()