From 75a863ce5f572d9e21dc8021693bcdc9fc819bf4 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 29 Mar 2023 19:51:10 +0200 Subject: [PATCH] server: gracefully shut down secondary tenant servers This change ensures that tenant servers managed by the server controller receive a graceful drain request as part of the graceful drain process of the surrounding KV node. This change, in turn, ensures that SQL clients connected to these secondary tenant servers benefit from the same guarantees (and graceful periods) as clients to the system tenant. Release note: None --- pkg/server/drain.go | 19 +++ pkg/server/server.go | 1 + pkg/server/server_controller.go | 18 +++ pkg/server/server_controller_orchestration.go | 117 ++++++++++++++++-- 4 files changed, 146 insertions(+), 9 deletions(-) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index fcf49101d45e..2c7649b07280 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -103,6 +103,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +307,23 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // So we need to first shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -397,6 +415,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index d3ad9fcc9e98..eda7eed9bb20 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1109,6 +1109,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e556e9c3cdb1..cf7c85b8250a 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -54,6 +55,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -212,6 +216,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -255,3 +266,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index e8f2f619b8d3..5acc328ec9fd 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -12,6 +12,7 @@ package server import ( "context" + "math" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -212,6 +213,28 @@ func (c *serverController) startControlledServer( tenantStopper := stop.NewStopper() + var shutdownInterface struct { + syncutil.Mutex + drainableServer interface { + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) + } + } + gracefulDrain := func(ctx context.Context) { + drainServer := func() interface { + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) + } { + shutdownInterface.Lock() + defer shutdownInterface.Unlock() + return shutdownInterface.drainableServer + }() + if drainServer == nil { + // Server not started yet. No graceful drain possible. + return + } + log.Infof(ctx, "gracefully draining tenant %q", tenantName) + doGracefulDrain(ctx, drainServer.gracefulDrain) + } + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -222,21 +245,28 @@ func (c *serverController) startControlledServer( case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + gracefulDrain(tenantCtx) tenantStopper.Stop(tenantCtx) case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) } }); err != nil { // The goroutine above is responsible for stopping the - // tenantStopper. If it fails to stop, we stop it here - // to avoid leaking the stopper. + // tenantStopper. If it fails to start, we need to avoid leaking + // the stopper. tenantStopper.Stop(ctx) return nil, err } @@ -314,6 +344,11 @@ func (c *serverController) startControlledServer( })) // Indicate the server has started. + func() { + shutdownInterface.Lock() + defer shutdownInterface.Unlock() + shutdownInterface.drainableServer = s + }() entry.server = s startedOrStoppedChAlreadyClosed = true entry.state.started.Set(true) @@ -418,6 +453,30 @@ func (c *serverController) startServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -432,11 +491,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { @@ -483,3 +538,47 @@ func (c *serverController) logStopEvent( c.logger.logStructuredEvent(ctx, ev) } + +// doGracefulDrain performs a graceful drain. +// +// FIXME(knz): This is the same code as in cli/start.go, +// startShutdownAsync. Maybe we could have a single function used by +// both. +func doGracefulDrain( + ctx context.Context, + drainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error), +) { + // Perform a graceful drain. + // FIXME(knz): This code currently keeps retrying forever. Is this reasonable? + var ( + remaining = uint64(math.MaxUint64) + prevRemaining = uint64(math.MaxUint64) + verbose = true // false + ) + + drainCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + + for ; ; prevRemaining = remaining { + var err error + remaining, _, err = drainFn(drainCtx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + break + } + if remaining == 0 { + // No more work to do. + break + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + } +}