diff --git a/pkg/cli/start.go b/pkg/cli/start.go index e0873db5693a..032b63630394 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1121,40 +1121,13 @@ func startShutdownAsync( drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil) if shouldDrain { - // Perform a graceful drain. We keep retrying forever, in - // case there are many range leases or some unavailability - // preventing progress. If the operator wants to expedite - // the shutdown, they will need to make it ungraceful - // via a 2nd signal. - var ( - remaining = uint64(math.MaxUint64) - prevRemaining = uint64(math.MaxUint64) - verbose = false - ) - - for ; ; prevRemaining = remaining { - var err error - remaining, _, err = s.Drain(drainCtx, verbose) - if err != nil { - log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err) - break - } - if remaining == 0 { - // No more work to do. - break - } - - // If range lease transfer stalls or the number of - // remaining leases somehow increases, verbosity is set - // to help with troubleshooting. - if remaining >= prevRemaining { - verbose = true - } - - // Avoid a busy wait with high CPU usage if the server replies - // with an incomplete drain too quickly. - time.Sleep(200 * time.Millisecond) - } + // Perform a graceful drain. This function keeps retrying and + // the call might never complete (e.g. due to some + // unavailability preventing progress). This is intentional. If + // the operator wants to expedite the shutdown, they will need + // to make it ungraceful by sending a second signal to the + // process, which will tickle the shortcut in waitForShutdown(). + server.CallDrainServerSide(drainCtx, s.Drain) } stopper.Stop(drainCtx) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index ac0835255ec7..3097b344980d 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -13,6 +13,7 @@ package server import ( "context" "io" + "math" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,6 +105,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +309,26 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // First tell the controller to stop starting new servers. + s.serverCtl.draining.Set(true) + + // Then shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -407,6 +430,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err @@ -432,3 +456,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error { } }) } + +// CallDrainServerSide is a reference implementation for a server-side +// function that wishes to shut down a server gracefully via the Drain +// interface. The Drain interface is responsible for notifying clients +// and shutting down systems in a particular order that prevents +// client app disruptions. We generally prefer graceful drains to the +// disorderly shutdown caused by either a process crash or a direct +// call to the stopper's Stop() method. +// +// By default, this code will wait forever for a graceful drain to +// complete. The caller can override this behavior by passing a context +// with a deadline. +// +// For an example client-side implementation (drain client over RPC), +// see the code in pkg/cli/node.go, doDrain(). +func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) { + var ( + prevRemaining = uint64(math.MaxUint64) + verbose = false + ) + + ctx = logtags.AddTag(ctx, "call-graceful-drain", nil) + for { + // Let the caller interrupt the process via context cancellation + // if so desired. + select { + case <-ctx.Done(): + log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err()) + return + default: + } + + remaining, _, err := drainFn(ctx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + return + } + if remaining == 0 { + // No more work to do. + log.Ops.Infof(ctx, "graceful drain complete") + return + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + + // Remember the remaining work to set the verbose flag in the next + // iteration. + prevRemaining = remaining + } +} + +// ServerSideDrainFn is the interface of the server-side handler for the Drain logic. +type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index 41ec42946f55..15f177418371 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1111,6 +1111,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e556e9c3cdb1..24740cd8cdaa 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -54,6 +55,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -102,6 +106,10 @@ type serverController struct { // a tenant routing error to the incoming client. sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) + // draining is set when the surrounding server starts draining, and + // prevents further creation of new tenant servers. + draining syncutil.AtomicBool + mu struct { syncutil.Mutex @@ -212,6 +220,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -255,3 +270,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index e8f2f619b8d3..5237b19e1a77 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -95,9 +95,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error { select { case <-time.After(watchInterval): case <-c.stopper.ShouldQuiesce(): + // Expedited server shutdown of outer server. + return + } + if c.draining.Get() { + // The outer server has started a graceful drain: stop + // picking up new servers. return } - if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil { log.Warningf(ctx, "cannot update running tenant services: %v", err) } @@ -173,6 +178,9 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, ) (*serverEntry, error) { + if c.draining.Get() { + return nil, errors.New("server is draining") + } entry, err := c.startControlledServer(ctx, tenantName) if err != nil { return nil, err @@ -204,14 +212,20 @@ func (c *serverController) startControlledServer( } topCtx := ctx + // Use a different context for the tasks below, because the tenant // stopper will have its own tracer which is incompatible with the // tracer attached to the incoming context. - tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) tenantStopper := stop.NewStopper() + // shutdownInterface enables the propagate-close task to + // initiate a graceful drain on the server started by the other + // task below. + var shutdownInterface tenantServerShutdownInterface + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -219,24 +233,43 @@ func (c *serverController) startControlledServer( case <-tenantStopper.ShouldQuiesce(): // Tenant is terminating on their own; nothing else to do here. log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) + case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + + // Ensure that the graceful drain for the tenant serer aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + shutdownInterface.maybeCallDrain(drainCtx) + tenantStopper.Stop(tenantCtx) + case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) tenantStopper.Stop(tenantCtx) } }); err != nil { // The goroutine above is responsible for stopping the - // tenantStopper. If it fails to stop, we stop it here - // to avoid leaking the stopper. + // tenantStopper. If it fails to start, we need to avoid leaking + // the stopper. tenantStopper.Stop(ctx) return nil, err } @@ -314,6 +347,7 @@ func (c *serverController) startControlledServer( })) // Indicate the server has started. + shutdownInterface.setServer(s) entry.server = s startedOrStoppedChAlreadyClosed = true entry.state.started.Set(true) @@ -418,6 +452,30 @@ func (c *serverController) startServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -432,11 +490,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { @@ -483,3 +537,45 @@ func (c *serverController) logStopEvent( c.logger.logStructuredEvent(ctx, ev) } + +// tenantServerShutdownInterface coordinates the goroutine that starts +// a tenant server and the goroutine that monitors async requests to +// shut it down. +type tenantServerShutdownInterface struct { + syncutil.Mutex + drainableServer drainableTenantServer +} + +func (t *tenantServerShutdownInterface) setServer(s drainableTenantServer) { + t.Lock() + defer t.Unlock() + t.drainableServer = s +} + +func (t *tenantServerShutdownInterface) getServer() drainableTenantServer { + t.Lock() + defer t.Unlock() + return t.drainableServer +} + +// maybeCallDrain performs the graceful drain sequence for the server +// if it has started already. +func (t *tenantServerShutdownInterface) maybeCallDrain(ctx context.Context) { + s := t.getServer() + if s == nil { + log.Infof(ctx, "server not started yet; nothing to drain") + return + } + log.Infof(ctx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(ctx, s.gracefulDrain) +} + +// drainableTenantServer is the subset of onDemandServer that is able +// to call the graceful drain service. +type drainableTenantServer interface { + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) +}