diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index e6474e47f2cf..6d73ae7e6ca7 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -55,6 +55,7 @@ go_test( "//pkg/ccl/kvccl", "//pkg/ccl/utilccl/licenseccl", "//pkg/clusterversion", + "//pkg/jobs", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/multitenant/tenantcapabilities", @@ -89,7 +90,6 @@ go_test( "//pkg/util/metric", "//pkg/util/protoutil", "//pkg/util/randutil", - "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index 4708d9347f57..a1a730698b25 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -387,34 +388,54 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) { ctx := context.Background() + t.Logf("starting test cluster") numNodes := 3 tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, DefaultTestTenant: base.TestTenantDisabled, }}) - defer tc.Stopper().Stop(ctx) + defer func() { + t.Logf("stopping test cluster") + tc.Stopper().Stop(ctx) + }() + t.Logf("starting tenant servers") db := tc.ServerConn(0) _, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED") require.NoError(t, err) // Pick a random node, try to run some SQL inside that tenant. rng, _ := randutil.NewTestRand() - sqlAddr := tc.Server(int(rng.Int31n(int32(numNodes)))).ServingSQLAddr() + serverIdx := int(rng.Int31n(int32(numNodes))) + sqlAddr := tc.Server(serverIdx).ServingSQLAddr() + t.Logf("attempting to use tenant server on node %d (%s)", serverIdx, sqlAddr) testutils.SucceedsSoon(t, func() error { tenantDB, err := serverutils.OpenDBConnE(sqlAddr, "cluster:hello", false, tc.Stopper()) if err != nil { + t.Logf("error connecting to tenant server (will retry): %v", err) return err } defer tenantDB.Close() - if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil { + if err := tenantDB.Ping(); err != nil { + t.Logf("connection not ready (will retry): %v", err) return err } + if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil { + // This is not retryable -- if the server accepts the + // connection, it better be ready. + t.Fatal(err) + } if _, err := tenantDB.Exec("GRANT ADMIN TO foo"); err != nil { - return err + // This is not retryable -- if the server accepts the + // connection, it better be ready. + t.Fatal(err) } return nil }) + t.Logf("tenant server on node %d (%s) is ready", serverIdx, sqlAddr) } func TestServerStartStop(t *testing.T) { @@ -545,3 +566,37 @@ func TestServerControllerLoginLogout(t *testing.T) { require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames) require.ElementsMatch(t, []string{"", ""}, cookieValues) } + +func TestServiceShutdownUsesGracefulDrain(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestTenantDisabled, + }) + defer s.Stopper().Stop(ctx) + + drainCh := make(chan struct{}) + + // Start a shared process server. + _, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantName: "hello", + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + RequireGracefulDrain: true, + DrainReportCh: drainCh, + }, + }, + }) + require.NoError(t, err) + + _, err = db.Exec("ALTER TENANT hello STOP SERVICE") + require.NoError(t, err) + + // Wait for the server to shut down. This also asserts that the + // graceful drain has occurred. + <-drainCh +} diff --git a/pkg/ccl/serverccl/server_startup_guardrails_test.go b/pkg/ccl/serverccl/server_startup_guardrails_test.go index e306a0a33850..4f84446fe516 100644 --- a/pkg/ccl/serverccl/server_startup_guardrails_test.go +++ b/pkg/ccl/serverccl/server_startup_guardrails_test.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // TestServerStartupGuardrails ensures that a SQL server will fail to start if @@ -74,69 +74,59 @@ func TestServerStartupGuardrails(t *testing.T) { } for i, test := range tests { - storageSettings := cluster.MakeTestingClusterSettingsWithVersions( - test.storageBinaryVersion, - test.storageBinaryMinSupportedVersion, - false, /* initializeVersion */ - ) + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + defer log.Scope(t).Close(t) - s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // Disable the default test tenant, since we create one explicitly - // below. - DefaultTestTenant: base.TestTenantDisabled, - Settings: storageSettings, - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - BinaryVersionOverride: test.storageBinaryVersion, - BootstrapVersionKeyOverride: clusterversion.V22_2, - DisableAutomaticVersionUpgrade: make(chan struct{}), - }, - SQLEvalContext: &eval.TestingKnobs{ - TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey, - }, - }, - }) + storageSettings := cluster.MakeTestingClusterSettingsWithVersions( + test.storageBinaryVersion, + test.storageBinaryMinSupportedVersion, + false, /* initializeVersion */ + ) - tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( - test.tenantBinaryVersion, - test.tenantBinaryMinSupportedVersion, - true, /* initializeVersion */ - ) - - // The tenant will be created with an active version equal to the version - // corresponding to TenantLogicalVersionKey. Tenant creation is expected - // to succeed for all test cases but server creation is expected to succeed - // only if tenantBinaryVersion is at least equal to the version corresponding - // to TenantLogicalVersionKey. - stopper := stop.NewStopper() - tenantServer, err := s.StartTenant(context.Background(), - base.TestTenantArgs{ - Settings: tenantSettings, - TenantID: serverutils.TestTenantID(), - Stopper: stopper, - TestingKnobs: base.TestingKnobs{ + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + // Disable the default test tenant, since we create one explicitly + // below. + DefaultTestTenant: base.TestTenantDisabled, + Settings: storageSettings, + Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - BinaryVersionOverride: test.tenantBinaryVersion, + BinaryVersionOverride: test.storageBinaryVersion, + BootstrapVersionKeyOverride: clusterversion.V22_2, DisableAutomaticVersionUpgrade: make(chan struct{}), }, + SQLEvalContext: &eval.TestingKnobs{ + TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey, + }, }, }) + defer s.Stopper().Stop(context.Background()) - if !testutils.IsError(err, test.expErrMatch) { - t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) - } + tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( + test.tenantBinaryVersion, + test.tenantBinaryMinSupportedVersion, + true, /* initializeVersion */ + ) - // Only attempt to stop the tenant if it was started successfully. - if err == nil { - tenantServer.Stopper().Stop(context.Background()) - } else { - // Test - stop the failed SQL server using a custom stopper - // NOTE: This custom stopper should not be required, but is because - // currently, if a SQL server fails to start it will not be cleaned - // up immediately without invoking the custom stopper. This could - // be a problem, and is tracked with #98868. - stopper.Stop(context.Background()) - } - s.Stopper().Stop(context.Background()) + // The tenant will be created with an active version equal to the version + // corresponding to TenantLogicalVersionKey. Tenant creation is expected + // to succeed for all test cases but server creation is expected to succeed + // only if tenantBinaryVersion is at least equal to the version corresponding + // to TenantLogicalVersionKey. + _, err := s.StartTenant(context.Background(), + base.TestTenantArgs{ + Settings: tenantSettings, + TenantID: serverutils.TestTenantID(), + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: test.tenantBinaryVersion, + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + }) + + if !testutils.IsError(err, test.expErrMatch) { + t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) + } + }) } } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index e0873db5693a..032b63630394 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1121,40 +1121,13 @@ func startShutdownAsync( drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil) if shouldDrain { - // Perform a graceful drain. We keep retrying forever, in - // case there are many range leases or some unavailability - // preventing progress. If the operator wants to expedite - // the shutdown, they will need to make it ungraceful - // via a 2nd signal. - var ( - remaining = uint64(math.MaxUint64) - prevRemaining = uint64(math.MaxUint64) - verbose = false - ) - - for ; ; prevRemaining = remaining { - var err error - remaining, _, err = s.Drain(drainCtx, verbose) - if err != nil { - log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err) - break - } - if remaining == 0 { - // No more work to do. - break - } - - // If range lease transfer stalls or the number of - // remaining leases somehow increases, verbosity is set - // to help with troubleshooting. - if remaining >= prevRemaining { - verbose = true - } - - // Avoid a busy wait with high CPU usage if the server replies - // with an incomplete drain too quickly. - time.Sleep(200 * time.Millisecond) - } + // Perform a graceful drain. This function keeps retrying and + // the call might never complete (e.g. due to some + // unavailability preventing progress). This is intentional. If + // the operator wants to expedite the shutdown, they will need + // to make it ungraceful by sending a second signal to the + // process, which will tickle the shortcut in waitForShutdown(). + server.CallDrainServerSide(drainCtx, s.Drain) } stopper.Stop(drainCtx) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index bced1d5312a7..ee4f32533fbd 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -13,6 +13,7 @@ package server import ( "context" "io" + "math" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,6 +105,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +309,26 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // First tell the controller to stop starting new servers. + s.serverCtl.draining.Set(true) + + // Then shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -399,7 +422,8 @@ func (s *drainServer) drainClients( s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx) // Drain all SQL table leases. This must be done after the pgServer has - // given sessions a chance to finish ongoing work. + // given sessions a chance to finish ongoing work and after the background + // tasks that may issue SQL statements have shut down. s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) // Mark this phase in the logs to clarify the context of any subsequent @@ -433,6 +457,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err @@ -458,3 +483,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error { } }) } + +// CallDrainServerSide is a reference implementation for a server-side +// function that wishes to shut down a server gracefully via the Drain +// interface. The Drain interface is responsible for notifying clients +// and shutting down systems in a particular order that prevents +// client app disruptions. We generally prefer graceful drains to the +// disorderly shutdown caused by either a process crash or a direct +// call to the stopper's Stop() method. +// +// By default, this code will wait forever for a graceful drain to +// complete. The caller can override this behavior by passing a context +// with a deadline. +// +// For an example client-side implementation (drain client over RPC), +// see the code in pkg/cli/node.go, doDrain(). +func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) { + var ( + prevRemaining = uint64(math.MaxUint64) + verbose = false + ) + + ctx = logtags.AddTag(ctx, "call-graceful-drain", nil) + for { + // Let the caller interrupt the process via context cancellation + // if so desired. + select { + case <-ctx.Done(): + log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err()) + return + default: + } + + remaining, _, err := drainFn(ctx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + return + } + if remaining == 0 { + // No more work to do. + log.Ops.Infof(ctx, "graceful drain complete") + return + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + + // Remember the remaining work to set the verbose flag in the next + // iteration. + prevRemaining = remaining + } +} + +// ServerSideDrainFn is the interface of the server-side handler for the Drain logic. +type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index bd640e6d4940..4a3a78d152dd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e556e9c3cdb1..e36aabdffdae 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,12 +24,24 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. type onDemandServer interface { + // preStart activates background tasks and initializes subsystems + // but does not yet accept incoming connections. + preStart(context.Context) error + + // acceptClients starts accepting incoming connections. + acceptClients(context.Context) error + + // stop stops this server. stop(context.Context) + // annotateCtx annotates the context with server-specific logging tags. + annotateCtx(context.Context) context.Context + // getHTTPHandlerFn retrieves the function that can serve HTTP // requests for this server. getHTTPHandlerFn() http.HandlerFunc @@ -54,6 +66,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -102,6 +117,10 @@ type serverController struct { // a tenant routing error to the incoming client. sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) + // draining is set when the surrounding server starts draining, and + // prevents further creation of new tenant servers. + draining syncutil.AtomicBool + mu struct { syncutil.Mutex @@ -183,6 +202,23 @@ type tenantServerWrapper struct { var _ onDemandServer = (*tenantServerWrapper)(nil) +func (t *tenantServerWrapper) annotateCtx(ctx context.Context) context.Context { + return t.server.AnnotateCtx(ctx) +} + +func (t *tenantServerWrapper) preStart(ctx context.Context) error { + return t.server.PreStart(ctx) +} + +func (t *tenantServerWrapper) acceptClients(ctx context.Context) error { + if err := t.server.AcceptClients(ctx); err != nil { + return err + } + // Show the tenant details in logs. + // TODO(knz): Remove this once we can use a single listener. + return t.server.reportTenantInfo(ctx) +} + func (t *tenantServerWrapper) stop(ctx context.Context) { ctx = t.server.AnnotateCtx(ctx) t.stopper.Stop(ctx) @@ -212,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -227,6 +270,20 @@ type systemServerWrapper struct { var _ onDemandServer = (*systemServerWrapper)(nil) +func (s *systemServerWrapper) annotateCtx(ctx context.Context) context.Context { + return s.server.AnnotateCtx(ctx) +} + +func (s *systemServerWrapper) preStart(ctx context.Context) error { + // No-op: the SQL service for the system tenant is started elsewhere. + return nil +} + +func (s *systemServerWrapper) acceptClients(ctx context.Context) error { + // No-op: the SQL service for the system tenant is started elsewhere. + return nil +} + func (s *systemServerWrapper) stop(ctx context.Context) { // No-op: the SQL service for the system tenant never shuts down. } @@ -255,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index afde3657d45a..d16c169451f0 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -83,7 +83,7 @@ func (s *Server) newTenantServer( // Apply the TestTenantArgs, if any. baseCfg.TestingKnobs = testArgs.Knobs - tenantServer, err := s.startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) + tenantServer, err := newTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) if err != nil { return nil, err } @@ -124,12 +124,12 @@ func (s *Server) getTenantID( return tenantID, nil } -// startTenantServerInternal starts a server for the given target +// newTenantServerInternal instantiates a server for the given target // tenant ID. // -// Note that even if an error is returned, tasks might have been started with -// the stopper, so the caller needs to Stop() it. -func (s *Server) startTenantServerInternal( +// Note that even if an error is returned, closers may have been +// registered with the stopper, so the caller needs to Stop() it. +func newTenantServerInternal( ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig, @@ -140,28 +140,13 @@ func (s *Server) startTenantServerInternal( stopper.SetTracer(baseCfg.Tracer) // New context, since we're using a separate tracer. - startCtx := ambientCtx.AnnotateCtx(context.Background()) + newCtx := ambientCtx.AnnotateCtx(context.Background()) // Inform the logs we're starting a new server. - log.Infof(startCtx, "starting tenant server") + log.Infof(newCtx, "creating tenant server") - // Now start the tenant proper. - tenantServer, err := NewSharedProcessTenantServer(startCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) - if err != nil { - return nil, err - } - - if err := tenantServer.Start(startCtx); err != nil { - return tenantServer, err - } - - // Show the tenant details in logs. - // TODO(knz): Remove this once we can use a single listener. - if err := reportTenantInfo(startCtx, baseCfg, sqlCfg); err != nil { - return tenantServer, err - } - - return tenantServer, nil + // Now instantiate the tenant server proper. + return newSharedProcessTenantServer(newCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) } func (s *Server) makeSharedProcessTenantConfig( @@ -411,11 +396,11 @@ func rederivePort(index int, addrToChange string, prevAddr string, portOffset in return net.JoinHostPort(h, port), nil } -func reportTenantInfo(ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig) error { +func (s *SQLServerWrapper) reportTenantInfo(ctx context.Context) error { var buf redact.StringBuilder buf.Printf("started tenant SQL server at %s\n", timeutil.Now()) - buf.Printf("webui:\t%s\n", baseCfg.AdminURL()) - clientConnOptions, serverParams := MakeServerOptionsForURL(baseCfg.Config) + buf.Printf("webui:\t%s\n", s.cfg.AdminURL()) + clientConnOptions, serverParams := MakeServerOptionsForURL(s.cfg.Config) pgURL, err := clientsecopts.MakeURLForServer(clientConnOptions, serverParams, url.User(username.RootUser)) if err != nil { log.Errorf(ctx, "failed computing the URL: %v", err) @@ -423,15 +408,15 @@ func reportTenantInfo(ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig) buf.Printf("sql:\t%s\n", pgURL.ToPQ()) buf.Printf("sql (JDBC):\t%s\n", pgURL.ToJDBC()) } - if baseCfg.SocketFile != "" { - buf.Printf("socket:\t%s\n", baseCfg.SocketFile) + if s.cfg.SocketFile != "" { + buf.Printf("socket:\t%s\n", s.cfg.SocketFile) } - if tmpDir := sqlCfg.TempStorageConfig.Path; tmpDir != "" { + if tmpDir := s.sqlCfg.TempStorageConfig.Path; tmpDir != "" { buf.Printf("temp dir:\t%s\n", tmpDir) } - buf.Printf("clusterID:\t%s\n", baseCfg.ClusterIDContainer.Get()) - buf.Printf("tenantID:\t%s\n", sqlCfg.TenantID) - buf.Printf("instanceID:\t%d\n", baseCfg.IDContainer.Get()) + buf.Printf("clusterID:\t%s\n", s.cfg.ClusterIDContainer.Get()) + buf.Printf("tenantID:\t%s\n", s.sqlCfg.TenantID) + buf.Printf("instanceID:\t%d\n", s.cfg.IDContainer.Get()) // Collect the formatted string and show it to the user. msg, err := util.ExpandTabsInRedactableBytes(buf.RedactableBytes()) if err != nil { diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index a5459a853502..69d43ee36c0f 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -96,9 +96,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error { select { case <-time.After(watchInterval): case <-c.stopper.ShouldQuiesce(): + // Expedited server shutdown of outer server. + return + } + if c.draining.Get() { + // The outer server has started a graceful drain: stop + // picking up new servers. return } - if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil { log.Warningf(ctx, "cannot update running tenant services: %v", err) } @@ -179,6 +184,9 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, ) (*serverEntry, error) { + if c.draining.Get() { + return nil, errors.New("server is draining") + } entry, err := c.startControlledServer(ctx, tenantName) if err != nil { return nil, err @@ -210,40 +218,62 @@ func (c *serverController) startControlledServer( } topCtx := ctx + // Use a different context for the tasks below, because the tenant // stopper will have its own tracer which is incompatible with the // tracer attached to the incoming context. + tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) - tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil) + // ctlStopper is a stopper uniquely responsible for the control + // loop. It is separate from the tenantStopper defined below so + // that we can retry the server instantiation if it fails. + ctlStopper := stop.NewStopper() - tenantStopper := stop.NewStopper() + // useGracefulDrainDuringTenantShutdown defined whether a graceful + // drain is requested on the tenant server by orchestration. + useGracefulDrainDuringTenantShutdown := make(chan bool, 1) // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { select { - case <-tenantStopper.ShouldQuiesce(): - // Tenant is terminating on their own; nothing else to do here. + case <-stoppedCh: + // Server control loop is terminating prematurely before a + // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + useGracefulDrainDuringTenantShutdown <- false + ctlStopper.Stop(tenantCtx) + case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + useGracefulDrainDuringTenantShutdown <- true + ctlStopper.Stop(tenantCtx) + case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + useGracefulDrainDuringTenantShutdown <- false + ctlStopper.Stop(tenantCtx) } }); err != nil { - // The goroutine above is responsible for stopping the - // tenantStopper. If it fails to stop, we stop it here - // to avoid leaking the stopper. - tenantStopper.Stop(ctx) + // The goroutine above is responsible for stopping the ctlStopper. + // If it fails to stop, we stop it here to avoid leaking a + // stopper. + ctlStopper.Stop(ctx) return nil, err } @@ -260,7 +290,7 @@ func (c *serverController) startControlledServer( entry.state.started.Set(false) close(stoppedCh) if !startedOrStoppedChAlreadyClosed { - entry.state.startErr = errors.New("server stop before start") + entry.state.startErr = errors.New("server stop before successful start") close(startedOrStoppedCh) } @@ -274,26 +304,95 @@ func (c *serverController) startControlledServer( // RunAsyncTask, because this stopper will be assigned its own // different tracer. ctx := tenantCtx - // We want a context that gets cancelled when the tenant is + // We want a context that gets cancelled when the server is // shutting down, for the possible few cases in - // startServerInternal which are not looking at the - // tenant.ShouldQuiesce() channel but are sensitive to context + // newServerInternal/preStart/acceptClients which are not looking at the + // tenantStopper.ShouldQuiesce() channel but are sensitive to context // cancellation. var cancel func() - ctx, cancel = tenantStopper.WithCancelOnQuiesce(ctx) + ctx, cancel = ctlStopper.WithCancelOnQuiesce(ctx) defer cancel() // Stop retrying startup/initialization if we are being shut // down early. retryOpts := retry.Options{ - Closer: tenantStopper.ShouldQuiesce(), + Closer: ctlStopper.ShouldQuiesce(), } - var s onDemandServer + // tenantStopper is the stopper specific to one tenant server + // instance. We define a new tenantStopper on every attempt to + // instantiate the tenant server below. It is then linked to + // ctlStopper below once the instantiation and start have + // succeeded. + var tenantStopper *stop.Stopper + + var tenantServer onDemandServer for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { - var err error - s, err = c.startServerInternal(ctx, entry.nameContainer, tenantStopper) + tenantStopper = stop.NewStopper() + + // Link the controller stopper to this tenant stopper. + if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. + return + case <-ctlStopper.ShouldQuiesce(): + select { + case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + default: + } + tenantStopper.Stop(ctx) + case <-c.stopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. + tenantStopper.Stop(ctx) + } + }); err != nil { + tenantStopper.Stop(ctx) + return + } + + // Try to create the server. + s, err := func() (onDemandServer, error) { + s, err := c.newServerInternal(ctx, entry.nameContainer, tenantStopper) + if err != nil { + return nil, errors.Wrap(err, "while creating server") + } + + // Note: we make preStart() below derive from ctx, which is + // cancelled on shutdown of the outer server. This is necessary + // to ensure preStart() properly stops prematurely in that case. + startCtx := s.annotateCtx(ctx) + startCtx = logtags.AddTag(startCtx, "start-server", nil) + log.Infof(startCtx, "starting tenant server") + if err := s.preStart(startCtx); err != nil { + return nil, errors.Wrap(err, "while starting server") + } + return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") + }() if err != nil { + // Creation failed. We stop the tenant stopper here, which also + // takes care of terminating the async task we've just started above. + tenantStopper.Stop(ctx) c.logStartEvent(ctx, roachpb.TenantID{}, 0, entry.nameContainer.Get(), false /* success */, err) log.Warningf(ctx, @@ -302,9 +401,10 @@ func (c *serverController) startControlledServer( entry.state.startErr = err continue } + tenantServer = s break } - if s == nil { + if tenantServer == nil { // Retry loop exited before the server could start. This // indicates that there was an async request to abandon the // server startup. This is OK; just terminate early. The defer @@ -313,14 +413,14 @@ func (c *serverController) startControlledServer( } // Log the start event and ensure the stop event is logged eventually. - tid, iid := s.getTenantID(), s.getInstanceID() + tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID() c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil) tenantStopper.AddCloser(stop.CloserFn(func() { c.logStopEvent(ctx, tid, iid, tenantName) })) // Indicate the server has started. - entry.server = s + entry.server = tenantServer startedOrStoppedChAlreadyClosed = true entry.state.started.Set(true) close(startedOrStoppedCh) @@ -330,7 +430,7 @@ func (c *serverController) startControlledServer( case <-tenantStopper.ShouldQuiesce(): log.Infof(ctx, "tenant %q finishing their own control loop", tenantName) - case shutdownRequest := <-s.shutdownRequested(): + case shutdownRequest := <-tenantServer.shutdownRequested(): log.Infof(ctx, "tenant %q requesting their own shutdown: %v", tenantName, shutdownRequest.ShutdownCause()) // Make the async stop goroutine above pick up the task of shutting down. @@ -406,7 +506,7 @@ func (c *serverController) startAndWaitForRunningServer( } } -func (c *serverController) startServerInternal( +func (c *serverController) newServerInternal( ctx context.Context, nameContainer *roachpb.TenantNameContainer, tenantStopper *stop.Stopper, ) (onDemandServer, error) { tenantName := nameContainer.Get() @@ -424,6 +524,30 @@ func (c *serverController) startServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -438,11 +562,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 23b8e2103dbc..ce63e406b18a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1689,15 +1689,24 @@ func (s *SQLServer) preStart( // shutdown; but may be a sign of a problem in production or for // tests that need to restart a server. stopper.AddCloser(stop.CloserFn(func() { + var sk *TestingKnobs + if knobs.Server != nil { + sk, _ = knobs.Server.(*TestingKnobs) + } + if !s.gracefulDrainComplete.Get() { warnCtx := s.AnnotateCtx(context.Background()) - if knobs.Server != nil && knobs.Server.(*TestingKnobs).RequireGracefulDrain { + if sk != nil && sk.RequireGracefulDrain { log.Fatalf(warnCtx, "drain required but not performed") } log.Warningf(warnCtx, "server shutdown without a prior graceful drain") } + + if sk != nil && sk.DrainReportCh != nil { + sk.DrainReportCh <- struct{}{} + } })) return nil diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index cc960869def4..6ea9ccd8f231 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -86,6 +86,7 @@ type SQLServerWrapper struct { // TODO(knz): Find a way to merge these two togethers so there is just // one implementation. + cfg *BaseConfig clock *hlc.Clock rpcContext *rpc.Context // The gRPC server on which the different RPC handlers will be registered. @@ -192,13 +193,13 @@ func NewSeparateProcessTenantServer( return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps) } -// NewSharedProcessTenantServer creates a tenant-specific, SQL-only +// newSharedProcessTenantServer creates a tenant-specific, SQL-only // server against a KV backend, with defaults appropriate for a // SQLServer that is not located in the same process as a KVServer. // // The caller is responsible for listening to the server's ShutdownRequested() // channel and stopping cfg.stopper when signaled. -func NewSharedProcessTenantServer( +func newSharedProcessTenantServer( ctx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, @@ -418,6 +419,8 @@ func newTenantServer( ) return &SQLServerWrapper{ + cfg: args.BaseConfig, + clock: args.clock, rpcContext: args.rpcContext, diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 24516c0fca2e..9b5b094e7fb5 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -138,6 +138,10 @@ type TestingKnobs struct { // RequireGracefulDrain, if set, causes a shutdown to fail with a log.Fatal // if the server is not gracefully drained prior to its stopper shutting down. RequireGracefulDrain bool + + // DrainReportCh, if set, is a channel that will be notified when + // the SQL service shuts down. + DrainReportCh chan struct{} } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.