Skip to content

Commit

Permalink
server,jobs: Better handle node drain
Browse files Browse the repository at this point in the history
Rework job registry drain signal to terminate the drain
as soon as the last job that was watching for drain signal
completes its drain

Epic: none
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed May 22, 2023
1 parent b34c137 commit a1be25e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
13 changes: 10 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type changeAggregator struct {
kvFeedDoneCh chan struct{}
kvFeedMemMon *mon.BytesMonitor

// drainWatchCh is signaled if the registry on this node is being drained.
drainWatchCh <-chan struct{}
drainDone func() // Cleanup function for drain watch.

// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
sink EventSink
Expand Down Expand Up @@ -351,17 +355,17 @@ func (ca *changeAggregator) startKVFeed(
return nil, err
}

ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
// Arrange for kvFeed to terminate if the job registry is being drained.
kvfeedCfg.FeedWatcher = func(ctx context.Context) error {
drainCh := ca.flowCtx.Cfg.JobRegistry.OnDrain()
if ca.knobs.OnDrain != nil {
drainCh = ca.knobs.OnDrain()
ca.drainWatchCh = ca.knobs.OnDrain()
}

select {
case <-ctx.Done():
return ctx.Err()
case <-drainCh:
case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
}
}
Expand Down Expand Up @@ -503,6 +507,9 @@ func (ca *changeAggregator) close() {
if ca.kvFeedDoneCh != nil {
<-ca.kvFeedDoneCh
}
if ca.drainDone != nil {
ca.drainDone()
}
if ca.eventConsumer != nil {
_ = ca.eventConsumer.Close() // context cancellation expected here.
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,12 +1276,22 @@ func (b *changefeedResumer) resumeWithRetries(
return jobs.MarkAsRetryJobError(err)
}

if errors.Is(flowErr, changefeedbase.ErrNodeDraining) {
isRegistryDraining := func() bool {
drainCh, cleanup := execCfg.JobRegistry.OnDrain()
defer cleanup()
select {
case <-execCfg.JobRegistry.OnDrain():
case <-drainCh:
return true
default:
return false
}
}

if errors.Is(flowErr, changefeedbase.ErrNodeDraining) {
if isRegistryDraining() {
// If this node is draining, there is no point in retrying.
return jobs.MarkAsRetryJobError(changefeedbase.ErrNodeDraining)
default:
} else {
// We know that some node (other than this one) is draining.
// When we retry, the planner ought to take into account
// this information. However, there is a bit of a race here
Expand Down
55 changes: 50 additions & 5 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type Registry struct {
// process starts.
draining bool

// numDrainWait is the number of jobs that are still
// processing drain request.
numDrainWait int

// ingestingJobs is a map of jobs which are actively ingesting on this node
// including via a processor.
ingestingJobs map[jobspb.JobID]struct{}
Expand All @@ -158,6 +162,10 @@ type Registry struct {
// in an orderly fashion, prior to resumer context being canceled.
// The registry will no longer adopt new jobs once this channel closed.
drainRequested chan struct{}
// jobDrained signaled to indicate that the job watching drainRequested channel
// completed its drain logic.
jobDrained chan struct{}

// drainJobs closed when registry should drain/cancel all active
// jobs and should no longer adopt new jobs.
drainJobs chan struct{}
Expand Down Expand Up @@ -238,6 +246,7 @@ func MakeRegistry(
withSessionEvery: log.Every(time.Second),
drainJobs: make(chan struct{}),
drainRequested: make(chan struct{}),
jobDrained: make(chan struct{}, 1),
}
if knobs != nil {
r.knobs = *knobs
Expand Down Expand Up @@ -1997,24 +2006,60 @@ func (r *Registry) WaitForRegistryShutdown(ctx context.Context) {
// by requesting all currently running jobs, as well as various job registry
// processes terminate.
// WaitForRegistryShutdown can then be used to wait for those tasks to complete.
func (r *Registry) DrainRequested() func() {
func (r *Registry) DrainRequested(ctx context.Context, maxWait time.Duration) {
r.mu.Lock()
alreadyDraining := r.mu.draining
numWait := r.mu.numDrainWait
r.mu.draining = true
r.mu.Unlock()

if alreadyDraining {
return func() {}
return
}

close(r.drainRequested)
return func() { close(r.drainJobs) }
defer close(r.drainRequested)

if numWait == 0 {
return
}

t := timeutil.NewTimer()
defer t.Stop()
t.Reset(maxWait)

for numWait > 0 {
select {
case <-ctx.Done():
return
case <-t.C:
return
case <-r.jobDrained:
r.mu.Lock()
numWait = r.mu.numDrainWait
r.mu.Unlock()
}
}
}

// OnDrain returns a channel that can be selected on to detect when the job
// registry begins draining.
func (r *Registry) OnDrain() <-chan struct{} {
return r.drainRequested
// The caller must invoke returned function when drain completes.
func (r *Registry) OnDrain() (<-chan struct{}, func()) {
r.mu.Lock()
r.mu.numDrainWait++
r.mu.Unlock()

return r.drainRequested, func() {
r.mu.Lock()
r.mu.numDrainWait--
r.mu.Unlock()

select {
case r.jobDrained <- struct{}{}:
default:
}
}
}

// TestingIsJobIdle returns true if the job is adopted and currently idle.
Expand Down
7 changes: 1 addition & 6 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,7 @@ func (s *drainServer) drainClients(
// issues a BACKUP or some other job-based statement before it
// disconnects, and encounters a job error as a result -- that the
// registry is now unavailable due to the drain.
drainJobRegistry := s.sqlServer.jobRegistry.DrainRequested()
if delay := jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV); delay > 0 && shouldDelayDraining {
log.Ops.Infof(ctx, "waiting for %s for running jobs to notice that the node is draining", delay)
s.drainSleepFn(delay)
}
drainJobRegistry()
s.sqlServer.jobRegistry.DrainRequested(ctx, jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV))

// Inform the auto-stats tasks that the node is draining.
s.sqlServer.statsRefresher.SetDraining()
Expand Down

0 comments on commit a1be25e

Please sign in to comment.