diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index 5f76d91eb9b2..a1a730698b25 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -566,3 +566,37 @@ func TestServerControllerLoginLogout(t *testing.T) { require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames) require.ElementsMatch(t, []string{"", ""}, cookieValues) } + +func TestServiceShutdownUsesGracefulDrain(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestTenantDisabled, + }) + defer s.Stopper().Stop(ctx) + + drainCh := make(chan struct{}) + + // Start a shared process server. + _, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantName: "hello", + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + RequireGracefulDrain: true, + DrainReportCh: drainCh, + }, + }, + }) + require.NoError(t, err) + + _, err = db.Exec("ALTER TENANT hello STOP SERVICE") + require.NoError(t, err) + + // Wait for the server to shut down. This also asserts that the + // graceful drain has occurred. + <-drainCh +} diff --git a/pkg/cli/start.go b/pkg/cli/start.go index e0873db5693a..032b63630394 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1121,40 +1121,13 @@ func startShutdownAsync( drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil) if shouldDrain { - // Perform a graceful drain. We keep retrying forever, in - // case there are many range leases or some unavailability - // preventing progress. If the operator wants to expedite - // the shutdown, they will need to make it ungraceful - // via a 2nd signal. - var ( - remaining = uint64(math.MaxUint64) - prevRemaining = uint64(math.MaxUint64) - verbose = false - ) - - for ; ; prevRemaining = remaining { - var err error - remaining, _, err = s.Drain(drainCtx, verbose) - if err != nil { - log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err) - break - } - if remaining == 0 { - // No more work to do. - break - } - - // If range lease transfer stalls or the number of - // remaining leases somehow increases, verbosity is set - // to help with troubleshooting. - if remaining >= prevRemaining { - verbose = true - } - - // Avoid a busy wait with high CPU usage if the server replies - // with an incomplete drain too quickly. - time.Sleep(200 * time.Millisecond) - } + // Perform a graceful drain. This function keeps retrying and + // the call might never complete (e.g. due to some + // unavailability preventing progress). This is intentional. If + // the operator wants to expedite the shutdown, they will need + // to make it ungraceful by sending a second signal to the + // process, which will tickle the shortcut in waitForShutdown(). + server.CallDrainServerSide(drainCtx, s.Drain) } stopper.Stop(drainCtx) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index bced1d5312a7..ee4f32533fbd 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -13,6 +13,7 @@ package server import ( "context" "io" + "math" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,6 +105,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +309,26 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // First tell the controller to stop starting new servers. + s.serverCtl.draining.Set(true) + + // Then shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -399,7 +422,8 @@ func (s *drainServer) drainClients( s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx) // Drain all SQL table leases. This must be done after the pgServer has - // given sessions a chance to finish ongoing work. + // given sessions a chance to finish ongoing work and after the background + // tasks that may issue SQL statements have shut down. s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) // Mark this phase in the logs to clarify the context of any subsequent @@ -433,6 +457,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err @@ -458,3 +483,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error { } }) } + +// CallDrainServerSide is a reference implementation for a server-side +// function that wishes to shut down a server gracefully via the Drain +// interface. The Drain interface is responsible for notifying clients +// and shutting down systems in a particular order that prevents +// client app disruptions. We generally prefer graceful drains to the +// disorderly shutdown caused by either a process crash or a direct +// call to the stopper's Stop() method. +// +// By default, this code will wait forever for a graceful drain to +// complete. The caller can override this behavior by passing a context +// with a deadline. +// +// For an example client-side implementation (drain client over RPC), +// see the code in pkg/cli/node.go, doDrain(). +func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) { + var ( + prevRemaining = uint64(math.MaxUint64) + verbose = false + ) + + ctx = logtags.AddTag(ctx, "call-graceful-drain", nil) + for { + // Let the caller interrupt the process via context cancellation + // if so desired. + select { + case <-ctx.Done(): + log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err()) + return + default: + } + + remaining, _, err := drainFn(ctx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + return + } + if remaining == 0 { + // No more work to do. + log.Ops.Infof(ctx, "graceful drain complete") + return + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + + // Remember the remaining work to set the verbose flag in the next + // iteration. + prevRemaining = remaining + } +} + +// ServerSideDrainFn is the interface of the server-side handler for the Drain logic. +type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index bd640e6d4940..4a3a78d152dd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index 0f398dc307b7..e36aabdffdae 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -65,6 +66,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -113,6 +117,10 @@ type serverController struct { // a tenant routing error to the incoming client. sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) + // draining is set when the surrounding server starts draining, and + // prevents further creation of new tenant servers. + draining syncutil.AtomicBool + mu struct { syncutil.Mutex @@ -240,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -297,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index b1f3c7ecac8a..69d43ee36c0f 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -96,9 +96,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error { select { case <-time.After(watchInterval): case <-c.stopper.ShouldQuiesce(): + // Expedited server shutdown of outer server. + return + } + if c.draining.Get() { + // The outer server has started a graceful drain: stop + // picking up new servers. return } - if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil { log.Warningf(ctx, "cannot update running tenant services: %v", err) } @@ -179,6 +184,9 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, ) (*serverEntry, error) { + if c.draining.Get() { + return nil, errors.New("server is draining") + } entry, err := c.startControlledServer(ctx, tenantName) if err != nil { return nil, err @@ -216,12 +224,17 @@ func (c *serverController) startControlledServer( // tracer attached to the incoming context. tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) // ctlStopper is a stopper uniquely responsible for the control // loop. It is separate from the tenantStopper defined below so // that we can retry the server instantiation if it fails. ctlStopper := stop.NewStopper() + // useGracefulDrainDuringTenantShutdown defined whether a graceful + // drain is requested on the tenant server by orchestration. + useGracefulDrainDuringTenantShutdown := make(chan bool, 1) + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -230,18 +243,30 @@ func (c *serverController) startControlledServer( // Server control loop is terminating prematurely before a // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) + case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- true ctlStopper.Stop(tenantCtx) + case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) } }); err != nil { @@ -309,10 +334,36 @@ func (c *serverController) startControlledServer( if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { select { case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. return case <-ctlStopper.ShouldQuiesce(): + select { + case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + default: + } tenantStopper.Stop(ctx) case <-c.stopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. tenantStopper.Stop(ctx) } }); err != nil { @@ -473,6 +524,30 @@ func (c *serverController) newServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -487,11 +562,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 23b8e2103dbc..ce63e406b18a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1689,15 +1689,24 @@ func (s *SQLServer) preStart( // shutdown; but may be a sign of a problem in production or for // tests that need to restart a server. stopper.AddCloser(stop.CloserFn(func() { + var sk *TestingKnobs + if knobs.Server != nil { + sk, _ = knobs.Server.(*TestingKnobs) + } + if !s.gracefulDrainComplete.Get() { warnCtx := s.AnnotateCtx(context.Background()) - if knobs.Server != nil && knobs.Server.(*TestingKnobs).RequireGracefulDrain { + if sk != nil && sk.RequireGracefulDrain { log.Fatalf(warnCtx, "drain required but not performed") } log.Warningf(warnCtx, "server shutdown without a prior graceful drain") } + + if sk != nil && sk.DrainReportCh != nil { + sk.DrainReportCh <- struct{}{} + } })) return nil diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 24516c0fca2e..9b5b094e7fb5 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -138,6 +138,10 @@ type TestingKnobs struct { // RequireGracefulDrain, if set, causes a shutdown to fail with a log.Fatal // if the server is not gracefully drained prior to its stopper shutting down. RequireGracefulDrain bool + + // DrainReportCh, if set, is a channel that will be notified when + // the SQL service shuts down. + DrainReportCh chan struct{} } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.