From 4d4c111198f0c0565c4808de7df067e0400563e6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 29 Mar 2023 19:51:10 +0200 Subject: [PATCH] server: gracefully shut down secondary tenant servers This change ensures that tenant servers managed by the server controller receive a graceful drain request as part of the graceful drain process of the surrounding KV node. This change, in turn, ensures that SQL clients connected to these secondary tenant servers benefit from the same guarantees (and graceful periods) as clients to the system tenant. Release note: None --- pkg/ccl/serverccl/server_controller_test.go | 34 +++++++ pkg/cli/start.go | 41 ++------- pkg/server/drain.go | 89 ++++++++++++++++++- pkg/server/server.go | 1 + pkg/server/server_controller.go | 22 +++++ pkg/server/server_controller_orchestration.go | 87 ++++++++++++++++-- pkg/server/server_sql.go | 11 ++- pkg/server/testing_knobs.go | 4 + 8 files changed, 245 insertions(+), 44 deletions(-) diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index 5f76d91eb9b2..a1a730698b25 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -566,3 +566,37 @@ func TestServerControllerLoginLogout(t *testing.T) { require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames) require.ElementsMatch(t, []string{"", ""}, cookieValues) } + +func TestServiceShutdownUsesGracefulDrain(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestTenantDisabled, + }) + defer s.Stopper().Stop(ctx) + + drainCh := make(chan struct{}) + + // Start a shared process server. + _, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantName: "hello", + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + RequireGracefulDrain: true, + DrainReportCh: drainCh, + }, + }, + }) + require.NoError(t, err) + + _, err = db.Exec("ALTER TENANT hello STOP SERVICE") + require.NoError(t, err) + + // Wait for the server to shut down. This also asserts that the + // graceful drain has occurred. + <-drainCh +} diff --git a/pkg/cli/start.go b/pkg/cli/start.go index e0873db5693a..032b63630394 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1121,40 +1121,13 @@ func startShutdownAsync( drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil) if shouldDrain { - // Perform a graceful drain. We keep retrying forever, in - // case there are many range leases or some unavailability - // preventing progress. If the operator wants to expedite - // the shutdown, they will need to make it ungraceful - // via a 2nd signal. - var ( - remaining = uint64(math.MaxUint64) - prevRemaining = uint64(math.MaxUint64) - verbose = false - ) - - for ; ; prevRemaining = remaining { - var err error - remaining, _, err = s.Drain(drainCtx, verbose) - if err != nil { - log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err) - break - } - if remaining == 0 { - // No more work to do. - break - } - - // If range lease transfer stalls or the number of - // remaining leases somehow increases, verbosity is set - // to help with troubleshooting. - if remaining >= prevRemaining { - verbose = true - } - - // Avoid a busy wait with high CPU usage if the server replies - // with an incomplete drain too quickly. - time.Sleep(200 * time.Millisecond) - } + // Perform a graceful drain. This function keeps retrying and + // the call might never complete (e.g. due to some + // unavailability preventing progress). This is intentional. If + // the operator wants to expedite the shutdown, they will need + // to make it ungraceful by sending a second signal to the + // process, which will tickle the shortcut in waitForShutdown(). + server.CallDrainServerSide(drainCtx, s.Drain) } stopper.Stop(drainCtx) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index bced1d5312a7..ee4f32533fbd 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -13,6 +13,7 @@ package server import ( "context" "io" + "math" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,6 +105,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +309,26 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // First tell the controller to stop starting new servers. + s.serverCtl.draining.Set(true) + + // Then shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -399,7 +422,8 @@ func (s *drainServer) drainClients( s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx) // Drain all SQL table leases. This must be done after the pgServer has - // given sessions a chance to finish ongoing work. + // given sessions a chance to finish ongoing work and after the background + // tasks that may issue SQL statements have shut down. s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) // Mark this phase in the logs to clarify the context of any subsequent @@ -433,6 +457,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err @@ -458,3 +483,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error { } }) } + +// CallDrainServerSide is a reference implementation for a server-side +// function that wishes to shut down a server gracefully via the Drain +// interface. The Drain interface is responsible for notifying clients +// and shutting down systems in a particular order that prevents +// client app disruptions. We generally prefer graceful drains to the +// disorderly shutdown caused by either a process crash or a direct +// call to the stopper's Stop() method. +// +// By default, this code will wait forever for a graceful drain to +// complete. The caller can override this behavior by passing a context +// with a deadline. +// +// For an example client-side implementation (drain client over RPC), +// see the code in pkg/cli/node.go, doDrain(). +func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) { + var ( + prevRemaining = uint64(math.MaxUint64) + verbose = false + ) + + ctx = logtags.AddTag(ctx, "call-graceful-drain", nil) + for { + // Let the caller interrupt the process via context cancellation + // if so desired. + select { + case <-ctx.Done(): + log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err()) + return + default: + } + + remaining, _, err := drainFn(ctx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + return + } + if remaining == 0 { + // No more work to do. + log.Ops.Infof(ctx, "graceful drain complete") + return + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + + // Remember the remaining work to set the verbose flag in the next + // iteration. + prevRemaining = remaining + } +} + +// ServerSideDrainFn is the interface of the server-side handler for the Drain logic. +type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index bd640e6d4940..4a3a78d152dd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index 0f398dc307b7..e36aabdffdae 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -65,6 +66,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -113,6 +117,10 @@ type serverController struct { // a tenant routing error to the incoming client. sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) + // draining is set when the surrounding server starts draining, and + // prevents further creation of new tenant servers. + draining syncutil.AtomicBool + mu struct { syncutil.Mutex @@ -240,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -297,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index b1f3c7ecac8a..69d43ee36c0f 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -96,9 +96,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error { select { case <-time.After(watchInterval): case <-c.stopper.ShouldQuiesce(): + // Expedited server shutdown of outer server. + return + } + if c.draining.Get() { + // The outer server has started a graceful drain: stop + // picking up new servers. return } - if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil { log.Warningf(ctx, "cannot update running tenant services: %v", err) } @@ -179,6 +184,9 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, ) (*serverEntry, error) { + if c.draining.Get() { + return nil, errors.New("server is draining") + } entry, err := c.startControlledServer(ctx, tenantName) if err != nil { return nil, err @@ -216,12 +224,17 @@ func (c *serverController) startControlledServer( // tracer attached to the incoming context. tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) // ctlStopper is a stopper uniquely responsible for the control // loop. It is separate from the tenantStopper defined below so // that we can retry the server instantiation if it fails. ctlStopper := stop.NewStopper() + // useGracefulDrainDuringTenantShutdown defined whether a graceful + // drain is requested on the tenant server by orchestration. + useGracefulDrainDuringTenantShutdown := make(chan bool, 1) + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -230,18 +243,30 @@ func (c *serverController) startControlledServer( // Server control loop is terminating prematurely before a // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) + case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- true ctlStopper.Stop(tenantCtx) + case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) } }); err != nil { @@ -309,10 +334,36 @@ func (c *serverController) startControlledServer( if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { select { case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. return case <-ctlStopper.ShouldQuiesce(): + select { + case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + default: + } tenantStopper.Stop(ctx) case <-c.stopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. tenantStopper.Stop(ctx) } }); err != nil { @@ -473,6 +524,30 @@ func (c *serverController) newServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -487,11 +562,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 23b8e2103dbc..ce63e406b18a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1689,15 +1689,24 @@ func (s *SQLServer) preStart( // shutdown; but may be a sign of a problem in production or for // tests that need to restart a server. stopper.AddCloser(stop.CloserFn(func() { + var sk *TestingKnobs + if knobs.Server != nil { + sk, _ = knobs.Server.(*TestingKnobs) + } + if !s.gracefulDrainComplete.Get() { warnCtx := s.AnnotateCtx(context.Background()) - if knobs.Server != nil && knobs.Server.(*TestingKnobs).RequireGracefulDrain { + if sk != nil && sk.RequireGracefulDrain { log.Fatalf(warnCtx, "drain required but not performed") } log.Warningf(warnCtx, "server shutdown without a prior graceful drain") } + + if sk != nil && sk.DrainReportCh != nil { + sk.DrainReportCh <- struct{}{} + } })) return nil diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 24516c0fca2e..9b5b094e7fb5 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -138,6 +138,10 @@ type TestingKnobs struct { // RequireGracefulDrain, if set, causes a shutdown to fail with a log.Fatal // if the server is not gracefully drained prior to its stopper shutting down. RequireGracefulDrain bool + + // DrainReportCh, if set, is a channel that will be notified when + // the SQL service shuts down. + DrainReportCh chan struct{} } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.