diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index e81d4be444b5..9f38602528c7 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -75,7 +75,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
-server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
+server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 290247e81fb0..1ee712f2a192 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -105,7 +105,7 @@
server.secondary_tenants.redact_trace.enabled
| boolean | true | controls if server side traces are redacted for tenant operations | Dedicated/Self-Hosted |
server.shutdown.connection_wait
| duration | 0s | the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) | Serverless/Dedicated/Self-Hosted |
server.shutdown.drain_wait
| duration | 0s | the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) | Serverless/Dedicated/Self-Hosted |
-server.shutdown.jobs_wait
| duration | 0s | the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown | Serverless/Dedicated/Self-Hosted |
+server.shutdown.jobs_wait
| duration | 10s | the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown | Serverless/Dedicated/Self-Hosted |
server.shutdown.lease_transfer_wait
| duration | 5s | the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) | Dedicated/Self-Hosted |
server.shutdown.query_wait
| duration | 10s | the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) | Serverless/Dedicated/Self-Hosted |
server.time_until_store_dead
| duration | 5m0s | the time after which if there is no new gossiped information about a store, it is considered dead | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index a6cb15b8888f..664720c9b695 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -59,6 +59,10 @@ type changeAggregator struct {
kvFeedDoneCh chan struct{}
kvFeedMemMon *mon.BytesMonitor
+ // drainWatchCh is signaled if the registry on this node is being drained.
+ drainWatchCh <-chan struct{}
+ drainDone func() // Cleanup function for drain watch.
+
// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
sink EventSink
@@ -351,17 +355,17 @@ func (ca *changeAggregator) startKVFeed(
return nil, err
}
+ ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain()
// Arrange for kvFeed to terminate if the job registry is being drained.
kvfeedCfg.FeedWatcher = func(ctx context.Context) error {
- drainCh := ca.flowCtx.Cfg.JobRegistry.OnDrain()
if ca.knobs.OnDrain != nil {
- drainCh = ca.knobs.OnDrain()
+ ca.drainWatchCh = ca.knobs.OnDrain()
}
select {
case <-ctx.Done():
return ctx.Err()
- case <-drainCh:
+ case <-ca.drainWatchCh:
return changefeedbase.ErrNodeDraining
}
}
@@ -503,6 +507,9 @@ func (ca *changeAggregator) close() {
if ca.kvFeedDoneCh != nil {
<-ca.kvFeedDoneCh
}
+ if ca.drainDone != nil {
+ ca.drainDone()
+ }
if ca.eventConsumer != nil {
_ = ca.eventConsumer.Close() // context cancellation expected here.
}
diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go
index 5d6ec8fff403..134ab4d9824d 100644
--- a/pkg/ccl/changefeedccl/changefeed_stmt.go
+++ b/pkg/ccl/changefeedccl/changefeed_stmt.go
@@ -1207,6 +1207,12 @@ func (b *changefeedResumer) resumeWithRetries(
}
}
+ // Grab a "reference" to this nodes job registry in order to make sure
+ // this resumer has enough time to persist up to date checkpoint in case
+ // of node drain.
+ drainCh, cleanup := execCfg.JobRegistry.OnDrain()
+ defer cleanup()
+
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
@@ -1278,7 +1284,7 @@ func (b *changefeedResumer) resumeWithRetries(
if errors.Is(flowErr, changefeedbase.ErrNodeDraining) {
select {
- case <-execCfg.JobRegistry.OnDrain():
+ case <-drainCh:
// If this node is draining, there is no point in retrying.
return jobs.MarkAsRetryJobError(changefeedbase.ErrNodeDraining)
default:
diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go
index 3bd89bedeae5..7cbba10bdaf3 100644
--- a/pkg/jobs/registry.go
+++ b/pkg/jobs/registry.go
@@ -147,6 +147,10 @@ type Registry struct {
// process starts.
draining bool
+ // numDrainWait is the number of jobs that are still
+ // processing drain request.
+ numDrainWait int
+
// ingestingJobs is a map of jobs which are actively ingesting on this node
// including via a processor.
ingestingJobs map[jobspb.JobID]struct{}
@@ -158,6 +162,10 @@ type Registry struct {
// in an orderly fashion, prior to resumer context being canceled.
// The registry will no longer adopt new jobs once this channel closed.
drainRequested chan struct{}
+ // jobDrained signaled to indicate that the job watching drainRequested channel
+ // completed its drain logic.
+ jobDrained chan struct{}
+
// drainJobs closed when registry should drain/cancel all active
// jobs and should no longer adopt new jobs.
drainJobs chan struct{}
@@ -238,6 +246,7 @@ func MakeRegistry(
withSessionEvery: log.Every(time.Second),
drainJobs: make(chan struct{}),
drainRequested: make(chan struct{}),
+ jobDrained: make(chan struct{}, 1),
}
if knobs != nil {
r.knobs = *knobs
@@ -1993,28 +2002,66 @@ func (r *Registry) WaitForRegistryShutdown(ctx context.Context) {
}
// DrainRequested informs the job system that this node is being drained.
-// Returns a function that, when invoked, will initiate drain process
-// by requesting all currently running jobs, as well as various job registry
-// processes terminate.
+// Waits up to maxWait time for any callers who are monitoring for registry
+// drain to complete their drain logic.
// WaitForRegistryShutdown can then be used to wait for those tasks to complete.
-func (r *Registry) DrainRequested() func() {
+func (r *Registry) DrainRequested(ctx context.Context, maxWait time.Duration) {
r.mu.Lock()
alreadyDraining := r.mu.draining
+ numWait := r.mu.numDrainWait
r.mu.draining = true
r.mu.Unlock()
if alreadyDraining {
- return func() {}
+ return
}
close(r.drainRequested)
- return func() { close(r.drainJobs) }
+ defer close(r.drainJobs)
+
+ if numWait == 0 {
+ return
+ }
+
+ t := timeutil.NewTimer()
+ defer t.Stop()
+ t.Reset(maxWait)
+
+ for numWait > 0 {
+ select {
+ case <-ctx.Done():
+ return
+ case <-r.stopper.ShouldQuiesce():
+ return
+ case <-t.C:
+ t.Read = true
+ return
+ case <-r.jobDrained:
+ r.mu.Lock()
+ numWait = r.mu.numDrainWait
+ r.mu.Unlock()
+ }
+ }
}
// OnDrain returns a channel that can be selected on to detect when the job
// registry begins draining.
-func (r *Registry) OnDrain() <-chan struct{} {
- return r.drainRequested
+// The caller must invoke returned function when drain completes.
+func (r *Registry) OnDrain() (<-chan struct{}, func()) {
+ r.mu.Lock()
+ r.mu.numDrainWait++
+ r.mu.Unlock()
+
+ return r.drainRequested, func() {
+ r.mu.Lock()
+ r.mu.numDrainWait--
+ r.mu.Unlock()
+
+ select {
+ case r.jobDrained <- struct{}{}:
+ default:
+ }
+ }
}
// TestingIsJobIdle returns true if the job is adopted and currently idle.
diff --git a/pkg/server/drain.go b/pkg/server/drain.go
index 0b1a6c9ebffe..3c82dc35740d 100644
--- a/pkg/server/drain.go
+++ b/pkg/server/drain.go
@@ -72,7 +72,7 @@ var (
"server.shutdown.jobs_wait",
"the maximum amount of time a server waits for all currently executing jobs "+
"to notice drain request and to perform orderly shutdown",
- 0*time.Second,
+ 10*time.Second,
settings.NonNegativeDurationWithMaximum(10*time.Hour),
).WithPublic()
)
@@ -401,12 +401,7 @@ func (s *drainServer) drainClients(
// issues a BACKUP or some other job-based statement before it
// disconnects, and encounters a job error as a result -- that the
// registry is now unavailable due to the drain.
- drainJobRegistry := s.sqlServer.jobRegistry.DrainRequested()
- if delay := jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV); delay > 0 && shouldDelayDraining {
- log.Ops.Infof(ctx, "waiting for %s for running jobs to notice that the node is draining", delay)
- s.drainSleepFn(delay)
- }
- drainJobRegistry()
+ s.sqlServer.jobRegistry.DrainRequested(ctx, jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV))
// Inform the auto-stats tasks that the node is draining.
s.sqlServer.statsRefresher.SetDraining()
diff --git a/pkg/server/drain_test.go b/pkg/server/drain_test.go
index 8232b73f486e..42321243b7de 100644
--- a/pkg/server/drain_test.go
+++ b/pkg/server/drain_test.go
@@ -191,8 +191,6 @@ func newTestDrainContext(t *testing.T, drainSleepCallCount *int) *testDrainConte
}),
}
- serverutils.SetClusterSetting(t, tc.tc, "server.shutdown.jobs_wait", 0)
-
// We'll have the RPC talk to the first node.
var err error
tc.c, tc.connCloser, err = getAdminClientForServer(tc.tc.Server(0))
@@ -329,7 +327,6 @@ func TestServerShutdownReleasesSession(t *testing.T) {
defer tenant.Stopper().Stop(ctx)
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLRaw)
- tenantSQL.Exec(t, "SET CLUSTER SETTING server.shutdown.jobs_wait = '0s'")
queryOwner := func(id base.SQLInstanceID) (owner *string) {
tenantSQL.QueryRow(t, "SELECT session_id FROM system.sql_instances WHERE id = $1", id).Scan(&owner)
return owner