diff --git a/pkg/server/drain.go b/pkg/server/drain.go index fcf49101d45e..2c7649b07280 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -103,6 +103,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +307,23 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // So we need to first shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -397,6 +415,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index d3ad9fcc9e98..eda7eed9bb20 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1109,6 +1109,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e556e9c3cdb1..cf7c85b8250a 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -54,6 +55,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -212,6 +216,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -255,3 +266,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index e8f2f619b8d3..5acc328ec9fd 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -12,6 +12,7 @@ package server import ( "context" + "math" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -212,6 +213,28 @@ func (c *serverController) startControlledServer( tenantStopper := stop.NewStopper() + var shutdownInterface struct { + syncutil.Mutex + drainableServer interface { + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) + } + } + gracefulDrain := func(ctx context.Context) { + drainServer := func() interface { + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) + } { + shutdownInterface.Lock() + defer shutdownInterface.Unlock() + return shutdownInterface.drainableServer + }() + if drainServer == nil { + // Server not started yet. No graceful drain possible. + return + } + log.Infof(ctx, "gracefully draining tenant %q", tenantName) + doGracefulDrain(ctx, drainServer.gracefulDrain) + } + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -222,21 +245,28 @@ func (c *serverController) startControlledServer( case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + gracefulDrain(tenantCtx) tenantStopper.Stop(tenantCtx) case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) } }); err != nil { // The goroutine above is responsible for stopping the - // tenantStopper. If it fails to stop, we stop it here - // to avoid leaking the stopper. + // tenantStopper. If it fails to start, we need to avoid leaking + // the stopper. tenantStopper.Stop(ctx) return nil, err } @@ -314,6 +344,11 @@ func (c *serverController) startControlledServer( })) // Indicate the server has started. + func() { + shutdownInterface.Lock() + defer shutdownInterface.Unlock() + shutdownInterface.drainableServer = s + }() entry.server = s startedOrStoppedChAlreadyClosed = true entry.state.started.Set(true) @@ -418,6 +453,30 @@ func (c *serverController) startServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -432,11 +491,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { @@ -483,3 +538,47 @@ func (c *serverController) logStopEvent( c.logger.logStructuredEvent(ctx, ev) } + +// doGracefulDrain performs a graceful drain. +// +// FIXME(knz): This is the same code as in cli/start.go, +// startShutdownAsync. Maybe we could have a single function used by +// both. +func doGracefulDrain( + ctx context.Context, + drainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error), +) { + // Perform a graceful drain. + // FIXME(knz): This code currently keeps retrying forever. Is this reasonable? + var ( + remaining = uint64(math.MaxUint64) + prevRemaining = uint64(math.MaxUint64) + verbose = true // false + ) + + drainCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + + for ; ; prevRemaining = remaining { + var err error + remaining, _, err = drainFn(drainCtx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + break + } + if remaining == 0 { + // No more work to do. + break + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + } +}