Skip to content

Commit

Permalink
server,jobs: Better handle node drain
Browse files Browse the repository at this point in the history
Rework job registry drain signal to terminate the drain
as soon as the last job that was watching for drain signal
completes its drain

Epic: CRDB-26978
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed May 22, 2023
1 parent b34c137 commit 394c302
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
<tr><td><div id="setting-server-secondary-tenants-redact-trace-enabled" class="anchored"><code>server.secondary_tenants.redact_trace.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-connection-wait" class="anchored"><code>server.shutdown.connection_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-drain-wait" class="anchored"><code>server.shutdown.drain_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
13 changes: 10 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type changeAggregator struct {
kvFeedDoneCh chan struct{}
kvFeedMemMon *mon.BytesMonitor

// drainWatchCh is signaled if the registry on this node is being drained.
drainWatchCh <-chan struct{}
drainDone func() // Cleanup function for drain watch.

// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
sink EventSink
Expand Down Expand Up @@ -351,17 +355,17 @@ func (ca *changeAggregator) startKVFeed(
return nil, err
}

ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
// Arrange for kvFeed to terminate if the job registry is being drained.
kvfeedCfg.FeedWatcher = func(ctx context.Context) error {
drainCh := ca.flowCtx.Cfg.JobRegistry.OnDrain()
if ca.knobs.OnDrain != nil {
drainCh = ca.knobs.OnDrain()
ca.drainWatchCh = ca.knobs.OnDrain()
}

select {
case <-ctx.Done():
return ctx.Err()
case <-drainCh:
case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
}
}
Expand Down Expand Up @@ -503,6 +507,9 @@ func (ca *changeAggregator) close() {
if ca.kvFeedDoneCh != nil {
<-ca.kvFeedDoneCh
}
if ca.drainDone != nil {
ca.drainDone()
}
if ca.eventConsumer != nil {
_ = ca.eventConsumer.Close() // context cancellation expected here.
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,12 +1276,22 @@ func (b *changefeedResumer) resumeWithRetries(
return jobs.MarkAsRetryJobError(err)
}

if errors.Is(flowErr, changefeedbase.ErrNodeDraining) {
isRegistryDraining := func() bool {
drainCh, cleanup := execCfg.JobRegistry.OnDrain()
defer cleanup()
select {
case <-execCfg.JobRegistry.OnDrain():
case <-drainCh:
return true
default:
return false
}
}

if errors.Is(flowErr, changefeedbase.ErrNodeDraining) {
if isRegistryDraining() {
// If this node is draining, there is no point in retrying.
return jobs.MarkAsRetryJobError(changefeedbase.ErrNodeDraining)
default:
} else {
// We know that some node (other than this one) is draining.
// When we retry, the planner ought to take into account
// this information. However, there is a bit of a race here
Expand Down
63 changes: 55 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type Registry struct {
// process starts.
draining bool

// numDrainWait is the number of jobs that are still
// processing drain request.
numDrainWait int

// ingestingJobs is a map of jobs which are actively ingesting on this node
// including via a processor.
ingestingJobs map[jobspb.JobID]struct{}
Expand All @@ -158,6 +162,10 @@ type Registry struct {
// in an orderly fashion, prior to resumer context being canceled.
// The registry will no longer adopt new jobs once this channel closed.
drainRequested chan struct{}
// jobDrained signaled to indicate that the job watching drainRequested channel
// completed its drain logic.
jobDrained chan struct{}

// drainJobs closed when registry should drain/cancel all active
// jobs and should no longer adopt new jobs.
drainJobs chan struct{}
Expand Down Expand Up @@ -238,6 +246,7 @@ func MakeRegistry(
withSessionEvery: log.Every(time.Second),
drainJobs: make(chan struct{}),
drainRequested: make(chan struct{}),
jobDrained: make(chan struct{}, 1),
}
if knobs != nil {
r.knobs = *knobs
Expand Down Expand Up @@ -1993,28 +2002,66 @@ func (r *Registry) WaitForRegistryShutdown(ctx context.Context) {
}

// DrainRequested informs the job system that this node is being drained.
// Returns a function that, when invoked, will initiate drain process
// by requesting all currently running jobs, as well as various job registry
// processes terminate.
// Waits up to maxWait time for any callers who are monitoring for registry
// drain to complete their drain logic.
// WaitForRegistryShutdown can then be used to wait for those tasks to complete.
func (r *Registry) DrainRequested() func() {
func (r *Registry) DrainRequested(ctx context.Context, maxWait time.Duration) {
r.mu.Lock()
alreadyDraining := r.mu.draining
numWait := r.mu.numDrainWait
r.mu.draining = true
r.mu.Unlock()

if alreadyDraining {
return func() {}
return
}

close(r.drainRequested)
return func() { close(r.drainJobs) }
defer close(r.drainJobs)

if numWait == 0 {
return
}

t := timeutil.NewTimer()
defer t.Stop()
t.Reset(maxWait)

for numWait > 0 {
select {
case <-ctx.Done():
return
case <-r.stopper.ShouldQuiesce():
return
case <-t.C:
t.Read = true
return
case <-r.jobDrained:
r.mu.Lock()
numWait = r.mu.numDrainWait
r.mu.Unlock()
}
}
}

// OnDrain returns a channel that can be selected on to detect when the job
// registry begins draining.
func (r *Registry) OnDrain() <-chan struct{} {
return r.drainRequested
// The caller must invoke returned function when drain completes.
func (r *Registry) OnDrain() (<-chan struct{}, func()) {
r.mu.Lock()
r.mu.numDrainWait++
r.mu.Unlock()

return r.drainRequested, func() {
r.mu.Lock()
r.mu.numDrainWait--
r.mu.Unlock()

select {
case r.jobDrained <- struct{}{}:
default:
}
}
}

// TestingIsJobIdle returns true if the job is adopted and currently idle.
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (
"server.shutdown.jobs_wait",
"the maximum amount of time a server waits for all currently executing jobs "+
"to notice drain request and to perform orderly shutdown",
0*time.Second,
10*time.Second,
settings.NonNegativeDurationWithMaximum(10*time.Hour),
).WithPublic()
)
Expand Down Expand Up @@ -401,12 +401,7 @@ func (s *drainServer) drainClients(
// issues a BACKUP or some other job-based statement before it
// disconnects, and encounters a job error as a result -- that the
// registry is now unavailable due to the drain.
drainJobRegistry := s.sqlServer.jobRegistry.DrainRequested()
if delay := jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV); delay > 0 && shouldDelayDraining {
log.Ops.Infof(ctx, "waiting for %s for running jobs to notice that the node is draining", delay)
s.drainSleepFn(delay)
}
drainJobRegistry()
s.sqlServer.jobRegistry.DrainRequested(ctx, jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV))

// Inform the auto-stats tasks that the node is draining.
s.sqlServer.statsRefresher.SetDraining()
Expand Down
3 changes: 0 additions & 3 deletions pkg/server/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ func newTestDrainContext(t *testing.T, drainSleepCallCount *int) *testDrainConte
}),
}

serverutils.SetClusterSetting(t, tc.tc, "server.shutdown.jobs_wait", 0)

// We'll have the RPC talk to the first node.
var err error
tc.c, tc.connCloser, err = getAdminClientForServer(tc.tc.Server(0))
Expand Down Expand Up @@ -329,7 +327,6 @@ func TestServerShutdownReleasesSession(t *testing.T) {
defer tenant.Stopper().Stop(ctx)
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLRaw)

tenantSQL.Exec(t, "SET CLUSTER SETTING server.shutdown.jobs_wait = '0s'")
queryOwner := func(id base.SQLInstanceID) (owner *string) {
tenantSQL.QueryRow(t, "SELECT session_id FROM system.sql_instances WHERE id = $1", id).Scan(&owner)
return owner
Expand Down

0 comments on commit 394c302

Please sign in to comment.