From 394c3023df22e8fe06c207d399bfe09781622b34 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sun, 21 May 2023 20:39:22 -0400 Subject: [PATCH] server,jobs: Better handle node drain Rework job registry drain signal to terminate the drain as soon as the last job that was watching for drain signal completes its drain Epic: CRDB-26978 Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- .../changefeedccl/changefeed_processors.go | 13 +++- pkg/ccl/changefeedccl/changefeed_stmt.go | 16 ++++- pkg/jobs/registry.go | 63 ++++++++++++++++--- pkg/server/drain.go | 9 +-- pkg/server/drain_test.go | 3 - 7 files changed, 82 insertions(+), 26 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index e81d4be444b5..9f38602528c7 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -75,7 +75,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw -server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw +server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 290247e81fb0..1ee712f2a192 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -105,7 +105,7 @@
server.secondary_tenants.redact_trace.enabled
booleantruecontrols if server side traces are redacted for tenant operationsDedicated/Self-Hosted
server.shutdown.connection_wait
duration0sthe maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Serverless/Dedicated/Self-Hosted
server.shutdown.drain_wait
duration0sthe amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)Serverless/Dedicated/Self-Hosted -
server.shutdown.jobs_wait
duration0sthe maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdownServerless/Dedicated/Self-Hosted +
server.shutdown.jobs_wait
duration10sthe maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdownServerless/Dedicated/Self-Hosted
server.shutdown.lease_transfer_wait
duration5sthe timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Dedicated/Self-Hosted
server.shutdown.query_wait
duration10sthe timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Serverless/Dedicated/Self-Hosted
server.time_until_store_dead
duration5m0sthe time after which if there is no new gossiped information about a store, it is considered deadServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index a6cb15b8888f..664720c9b695 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -59,6 +59,10 @@ type changeAggregator struct { kvFeedDoneCh chan struct{} kvFeedMemMon *mon.BytesMonitor + // drainWatchCh is signaled if the registry on this node is being drained. + drainWatchCh <-chan struct{} + drainDone func() // Cleanup function for drain watch. + // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. sink EventSink @@ -351,17 +355,17 @@ func (ca *changeAggregator) startKVFeed( return nil, err } + ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() // Arrange for kvFeed to terminate if the job registry is being drained. kvfeedCfg.FeedWatcher = func(ctx context.Context) error { - drainCh := ca.flowCtx.Cfg.JobRegistry.OnDrain() if ca.knobs.OnDrain != nil { - drainCh = ca.knobs.OnDrain() + ca.drainWatchCh = ca.knobs.OnDrain() } select { case <-ctx.Done(): return ctx.Err() - case <-drainCh: + case <-ca.drainWatchCh: return changefeedbase.ErrNodeDraining } } @@ -503,6 +507,9 @@ func (ca *changeAggregator) close() { if ca.kvFeedDoneCh != nil { <-ca.kvFeedDoneCh } + if ca.drainDone != nil { + ca.drainDone() + } if ca.eventConsumer != nil { _ = ca.eventConsumer.Close() // context cancellation expected here. } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 5d6ec8fff403..a94f387d5a1e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1276,12 +1276,22 @@ func (b *changefeedResumer) resumeWithRetries( return jobs.MarkAsRetryJobError(err) } - if errors.Is(flowErr, changefeedbase.ErrNodeDraining) { + isRegistryDraining := func() bool { + drainCh, cleanup := execCfg.JobRegistry.OnDrain() + defer cleanup() select { - case <-execCfg.JobRegistry.OnDrain(): + case <-drainCh: + return true + default: + return false + } + } + + if errors.Is(flowErr, changefeedbase.ErrNodeDraining) { + if isRegistryDraining() { // If this node is draining, there is no point in retrying. return jobs.MarkAsRetryJobError(changefeedbase.ErrNodeDraining) - default: + } else { // We know that some node (other than this one) is draining. // When we retry, the planner ought to take into account // this information. However, there is a bit of a race here diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 3bd89bedeae5..7cbba10bdaf3 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -147,6 +147,10 @@ type Registry struct { // process starts. draining bool + // numDrainWait is the number of jobs that are still + // processing drain request. + numDrainWait int + // ingestingJobs is a map of jobs which are actively ingesting on this node // including via a processor. ingestingJobs map[jobspb.JobID]struct{} @@ -158,6 +162,10 @@ type Registry struct { // in an orderly fashion, prior to resumer context being canceled. // The registry will no longer adopt new jobs once this channel closed. drainRequested chan struct{} + // jobDrained signaled to indicate that the job watching drainRequested channel + // completed its drain logic. + jobDrained chan struct{} + // drainJobs closed when registry should drain/cancel all active // jobs and should no longer adopt new jobs. drainJobs chan struct{} @@ -238,6 +246,7 @@ func MakeRegistry( withSessionEvery: log.Every(time.Second), drainJobs: make(chan struct{}), drainRequested: make(chan struct{}), + jobDrained: make(chan struct{}, 1), } if knobs != nil { r.knobs = *knobs @@ -1993,28 +2002,66 @@ func (r *Registry) WaitForRegistryShutdown(ctx context.Context) { } // DrainRequested informs the job system that this node is being drained. -// Returns a function that, when invoked, will initiate drain process -// by requesting all currently running jobs, as well as various job registry -// processes terminate. +// Waits up to maxWait time for any callers who are monitoring for registry +// drain to complete their drain logic. // WaitForRegistryShutdown can then be used to wait for those tasks to complete. -func (r *Registry) DrainRequested() func() { +func (r *Registry) DrainRequested(ctx context.Context, maxWait time.Duration) { r.mu.Lock() alreadyDraining := r.mu.draining + numWait := r.mu.numDrainWait r.mu.draining = true r.mu.Unlock() if alreadyDraining { - return func() {} + return } close(r.drainRequested) - return func() { close(r.drainJobs) } + defer close(r.drainJobs) + + if numWait == 0 { + return + } + + t := timeutil.NewTimer() + defer t.Stop() + t.Reset(maxWait) + + for numWait > 0 { + select { + case <-ctx.Done(): + return + case <-r.stopper.ShouldQuiesce(): + return + case <-t.C: + t.Read = true + return + case <-r.jobDrained: + r.mu.Lock() + numWait = r.mu.numDrainWait + r.mu.Unlock() + } + } } // OnDrain returns a channel that can be selected on to detect when the job // registry begins draining. -func (r *Registry) OnDrain() <-chan struct{} { - return r.drainRequested +// The caller must invoke returned function when drain completes. +func (r *Registry) OnDrain() (<-chan struct{}, func()) { + r.mu.Lock() + r.mu.numDrainWait++ + r.mu.Unlock() + + return r.drainRequested, func() { + r.mu.Lock() + r.mu.numDrainWait-- + r.mu.Unlock() + + select { + case r.jobDrained <- struct{}{}: + default: + } + } } // TestingIsJobIdle returns true if the job is adopted and currently idle. diff --git a/pkg/server/drain.go b/pkg/server/drain.go index 0b1a6c9ebffe..3c82dc35740d 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -72,7 +72,7 @@ var ( "server.shutdown.jobs_wait", "the maximum amount of time a server waits for all currently executing jobs "+ "to notice drain request and to perform orderly shutdown", - 0*time.Second, + 10*time.Second, settings.NonNegativeDurationWithMaximum(10*time.Hour), ).WithPublic() ) @@ -401,12 +401,7 @@ func (s *drainServer) drainClients( // issues a BACKUP or some other job-based statement before it // disconnects, and encounters a job error as a result -- that the // registry is now unavailable due to the drain. - drainJobRegistry := s.sqlServer.jobRegistry.DrainRequested() - if delay := jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV); delay > 0 && shouldDelayDraining { - log.Ops.Infof(ctx, "waiting for %s for running jobs to notice that the node is draining", delay) - s.drainSleepFn(delay) - } - drainJobRegistry() + s.sqlServer.jobRegistry.DrainRequested(ctx, jobRegistryWait.Get(&s.sqlServer.execCfg.Settings.SV)) // Inform the auto-stats tasks that the node is draining. s.sqlServer.statsRefresher.SetDraining() diff --git a/pkg/server/drain_test.go b/pkg/server/drain_test.go index 8232b73f486e..42321243b7de 100644 --- a/pkg/server/drain_test.go +++ b/pkg/server/drain_test.go @@ -191,8 +191,6 @@ func newTestDrainContext(t *testing.T, drainSleepCallCount *int) *testDrainConte }), } - serverutils.SetClusterSetting(t, tc.tc, "server.shutdown.jobs_wait", 0) - // We'll have the RPC talk to the first node. var err error tc.c, tc.connCloser, err = getAdminClientForServer(tc.tc.Server(0)) @@ -329,7 +327,6 @@ func TestServerShutdownReleasesSession(t *testing.T) { defer tenant.Stopper().Stop(ctx) tenantSQL := sqlutils.MakeSQLRunner(tenantSQLRaw) - tenantSQL.Exec(t, "SET CLUSTER SETTING server.shutdown.jobs_wait = '0s'") queryOwner := func(id base.SQLInstanceID) (owner *string) { tenantSQL.QueryRow(t, "SELECT session_id FROM system.sql_instances WHERE id = $1", id).Scan(&owner) return owner