From 1bbb7429d6e3da557e94e1d22008c46ca73154b0 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 4 Apr 2023 12:54:49 +0200 Subject: [PATCH 01/22] serverccl: clarify the progress inside TestServerControllerMultiNodeTenantStartup Release note: None --- pkg/ccl/serverccl/server_controller_test.go | 25 +++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index 4b5dea4c57dd..f568541db5c0 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -414,34 +414,51 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) { ctx := context.Background() + t.Logf("starting test cluster") numNodes := 3 tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ DisableDefaultTestTenant: true, }}) - defer tc.Stopper().Stop(ctx) + defer func() { + t.Logf("stopping test cluster") + tc.Stopper().Stop(ctx) + }() + t.Logf("starting tenant servers") db := tc.ServerConn(0) _, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED") require.NoError(t, err) // Pick a random node, try to run some SQL inside that tenant. rng, _ := randutil.NewTestRand() - sqlAddr := tc.Server(int(rng.Int31n(int32(numNodes)))).ServingSQLAddr() + serverIdx := int(rng.Int31n(int32(numNodes))) + sqlAddr := tc.Server(serverIdx).ServingSQLAddr() + t.Logf("attempting to use tenant server on node %d (%s)", serverIdx, sqlAddr) testutils.SucceedsSoon(t, func() error { tenantDB, err := serverutils.OpenDBConnE(sqlAddr, "cluster:hello", false, tc.Stopper()) if err != nil { + t.Logf("error connecting to tenant server (will retry): %v", err) return err } defer tenantDB.Close() - if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil { + if err := tenantDB.Ping(); err != nil { + t.Logf("connection not ready (will retry): %v", err) return err } + if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil { + // This is not retryable -- if the server accepts the + // connection, it better be ready. + t.Fatal(err) + } if _, err := tenantDB.Exec("GRANT ADMIN TO foo"); err != nil { - return err + // This is not retryable -- if the server accepts the + // connection, it better be ready. + t.Fatal(err) } return nil }) + t.Logf("tenant server on node %d (%s) is ready", serverIdx, sqlAddr) } func TestServerStartStop(t *testing.T) { From 57c5d31aae71203086e46ee56ccd639f6704823a Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 4 Apr 2023 12:10:31 +0200 Subject: [PATCH 02/22] serverccl: make TestServerControllerMultiNodeTenantStartup faster Release note: None --- pkg/ccl/serverccl/BUILD.bazel | 1 + pkg/ccl/serverccl/server_controller_test.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index 306ab6896773..453438028aea 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -54,6 +54,7 @@ go_test( "//pkg/ccl/kvccl", "//pkg/ccl/utilccl/licenseccl", "//pkg/clusterversion", + "//pkg/jobs", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/multitenant/tenantcapabilities", diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index f568541db5c0..fa10daea35b5 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -418,6 +419,9 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) { numNodes := 3 tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, DisableDefaultTestTenant: true, }}) defer func() { From c24b999c4d9be2dddb41bc9ad42981e637d02177 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 18:29:05 +0200 Subject: [PATCH 03/22] server: unexport some functions Release note: None --- pkg/server/server_controller_new_server.go | 6 +++--- pkg/server/tenant.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index bbf5c10e0478..4660eac5e9e7 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -83,7 +83,7 @@ func (s *Server) newTenantServer( // Apply the TestTenantArgs, if any. baseCfg.TestingKnobs = testArgs.Knobs - tenantServer, err := s.startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) + tenantServer, err := startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func (s *Server) getTenantID( // // Note that even if an error is returned, tasks might have been started with // the stopper, so the caller needs to Stop() it. -func (s *Server) startTenantServerInternal( +func startTenantServerInternal( ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig, @@ -146,7 +146,7 @@ func (s *Server) startTenantServerInternal( log.Infof(startCtx, "starting tenant server") // Now start the tenant proper. - tenantServer, err := NewSharedProcessTenantServer(startCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) + tenantServer, err := newSharedProcessTenantServer(startCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) if err != nil { return nil, err } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index cc960869def4..7e1314d14ae1 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -192,13 +192,13 @@ func NewSeparateProcessTenantServer( return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps) } -// NewSharedProcessTenantServer creates a tenant-specific, SQL-only +// newSharedProcessTenantServer creates a tenant-specific, SQL-only // server against a KV backend, with defaults appropriate for a // SQLServer that is not located in the same process as a KVServer. // // The caller is responsible for listening to the server's ShutdownRequested() // channel and stopping cfg.stopper when signaled. -func NewSharedProcessTenantServer( +func newSharedProcessTenantServer( ctx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, From 98f2aa36f73ee47e2ead6456d53e17405969430b Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 18:59:08 +0200 Subject: [PATCH 04/22] server: save a reference to BaseConfig in SQLServerWrapper Release note: None --- pkg/server/tenant.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 7e1314d14ae1..6ea9ccd8f231 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -86,6 +86,7 @@ type SQLServerWrapper struct { // TODO(knz): Find a way to merge these two togethers so there is just // one implementation. + cfg *BaseConfig clock *hlc.Clock rpcContext *rpc.Context // The gRPC server on which the different RPC handlers will be registered. @@ -418,6 +419,8 @@ func newTenantServer( ) return &SQLServerWrapper{ + cfg: args.BaseConfig, + clock: args.clock, rpcContext: args.rpcContext, From 87c0b5cdbf6c3eac3382f92ab15dcb3e90282624 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 18:59:51 +0200 Subject: [PATCH 05/22] server: extend the `onDemandServer` interface ahead of splitting "new" vs "start" during construction. Release note: None --- pkg/server/server_controller.go | 37 +++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e556e9c3cdb1..c5a958298284 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -28,8 +28,19 @@ import ( // onDemandServer represents a server that can be started on demand. type onDemandServer interface { + // preStart activates background tasks and initializes subsystems + // but does not yet accept incoming connections. + preStart(context.Context) error + + // acceptClients starts accepting incoming connections. + acceptClients(context.Context) error + + // stop stops this server. stop(context.Context) + // annotateCtx annotates the context with server-specific logging tags. + annotateCtx(context.Context) context.Context + // getHTTPHandlerFn retrieves the function that can serve HTTP // requests for this server. getHTTPHandlerFn() http.HandlerFunc @@ -183,6 +194,18 @@ type tenantServerWrapper struct { var _ onDemandServer = (*tenantServerWrapper)(nil) +func (t *tenantServerWrapper) annotateCtx(ctx context.Context) context.Context { + return t.server.AnnotateCtx(ctx) +} + +func (t *tenantServerWrapper) preStart(ctx context.Context) error { + return t.server.PreStart(ctx) +} + +func (t *tenantServerWrapper) acceptClients(ctx context.Context) error { + return t.server.AcceptClients(ctx) +} + func (t *tenantServerWrapper) stop(ctx context.Context) { ctx = t.server.AnnotateCtx(ctx) t.stopper.Stop(ctx) @@ -227,6 +250,20 @@ type systemServerWrapper struct { var _ onDemandServer = (*systemServerWrapper)(nil) +func (s *systemServerWrapper) annotateCtx(ctx context.Context) context.Context { + return s.server.AnnotateCtx(ctx) +} + +func (s *systemServerWrapper) preStart(ctx context.Context) error { + // No-op: the SQL service for the system tenant is started elsewhere. + return nil +} + +func (s *systemServerWrapper) acceptClients(ctx context.Context) error { + // No-op: the SQL service for the system tenant is started elsewhere. + return nil +} + func (s *systemServerWrapper) stop(ctx context.Context) { // No-op: the SQL service for the system tenant never shuts down. } From 6c4726a7e9d622949017bd86de84a19f677503bb Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 19:05:32 +0200 Subject: [PATCH 06/22] server: lift `reportTenantInfo` into `*SQLServerWrapper` Release note: None --- pkg/server/server_controller_new_server.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index 4660eac5e9e7..0ae456862240 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -157,7 +157,7 @@ func startTenantServerInternal( // Show the tenant details in logs. // TODO(knz): Remove this once we can use a single listener. - if err := reportTenantInfo(startCtx, baseCfg, sqlCfg); err != nil { + if err := tenantServer.reportTenantInfo(startCtx); err != nil { return tenantServer, err } @@ -409,11 +409,11 @@ func rederivePort(index int, addrToChange string, prevAddr string, portOffset in return net.JoinHostPort(h, port), nil } -func reportTenantInfo(ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig) error { +func (s *SQLServerWrapper) reportTenantInfo(ctx context.Context) error { var buf redact.StringBuilder buf.Printf("started tenant SQL server at %s\n", timeutil.Now()) - buf.Printf("webui:\t%s\n", baseCfg.AdminURL()) - clientConnOptions, serverParams := MakeServerOptionsForURL(baseCfg.Config) + buf.Printf("webui:\t%s\n", s.cfg.AdminURL()) + clientConnOptions, serverParams := MakeServerOptionsForURL(s.cfg.Config) pgURL, err := clientsecopts.MakeURLForServer(clientConnOptions, serverParams, url.User(username.RootUser)) if err != nil { log.Errorf(ctx, "failed computing the URL: %v", err) @@ -421,15 +421,15 @@ func reportTenantInfo(ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig) buf.Printf("sql:\t%s\n", pgURL.ToPQ()) buf.Printf("sql (JDBC):\t%s\n", pgURL.ToJDBC()) } - if baseCfg.SocketFile != "" { - buf.Printf("socket:\t%s\n", baseCfg.SocketFile) + if s.cfg.SocketFile != "" { + buf.Printf("socket:\t%s\n", s.cfg.SocketFile) } - if tmpDir := sqlCfg.TempStorageConfig.Path; tmpDir != "" { + if tmpDir := s.sqlCfg.TempStorageConfig.Path; tmpDir != "" { buf.Printf("temp dir:\t%s\n", tmpDir) } - buf.Printf("clusterID:\t%s\n", baseCfg.ClusterIDContainer.Get()) - buf.Printf("tenantID:\t%s\n", sqlCfg.TenantID) - buf.Printf("instanceID:\t%d\n", baseCfg.IDContainer.Get()) + buf.Printf("clusterID:\t%s\n", s.cfg.ClusterIDContainer.Get()) + buf.Printf("tenantID:\t%s\n", s.sqlCfg.TenantID) + buf.Printf("instanceID:\t%d\n", s.cfg.IDContainer.Get()) // Collect the formatted string and show it to the user. msg, err := util.ExpandTabsInRedactableBytes(buf.RedactableBytes()) if err != nil { From 09fde50a3cac1e148627abd8334d9609c50c079b Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 19:16:48 +0200 Subject: [PATCH 07/22] server: separate "new" from "start" for tenant servers This peels the call to "start" from the `newTenantServer` interface and pulls it into the orchestration retry loop. This change also incidentally reveals an earlier misdesign: we are calling `newTenantServer` _then_ `start` in the same retry loop. If `new` succeeds but `start` fails, the next retry will call `newTenantServer` again *with the same stopper*, which will leak closers from the previous call to `new`. Release note: None --- pkg/server/server_controller.go | 7 +++- pkg/server/server_controller_new_server.go | 33 +++++-------------- pkg/server/server_controller_orchestration.go | 18 +++++++--- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index c5a958298284..0f398dc307b7 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -203,7 +203,12 @@ func (t *tenantServerWrapper) preStart(ctx context.Context) error { } func (t *tenantServerWrapper) acceptClients(ctx context.Context) error { - return t.server.AcceptClients(ctx) + if err := t.server.AcceptClients(ctx); err != nil { + return err + } + // Show the tenant details in logs. + // TODO(knz): Remove this once we can use a single listener. + return t.server.reportTenantInfo(ctx) } func (t *tenantServerWrapper) stop(ctx context.Context) { diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index 0ae456862240..a3bb5903f2f0 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -83,7 +83,7 @@ func (s *Server) newTenantServer( // Apply the TestTenantArgs, if any. baseCfg.TestingKnobs = testArgs.Knobs - tenantServer, err := startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) + tenantServer, err := newTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) if err != nil { return nil, err } @@ -124,12 +124,12 @@ func (s *Server) getTenantID( return tenantID, nil } -// startTenantServerInternal starts a server for the given target +// newTenantServerInternal instantiates a server for the given target // tenant ID. // -// Note that even if an error is returned, tasks might have been started with -// the stopper, so the caller needs to Stop() it. -func startTenantServerInternal( +// Note that even if an error is returned, closers may have been +// registered with the stopper, so the caller needs to Stop() it. +func newTenantServerInternal( ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig, @@ -140,28 +140,13 @@ func startTenantServerInternal( stopper.SetTracer(baseCfg.Tracer) // New context, since we're using a separate tracer. - startCtx := ambientCtx.AnnotateCtx(context.Background()) + newCtx := ambientCtx.AnnotateCtx(context.Background()) // Inform the logs we're starting a new server. - log.Infof(startCtx, "starting tenant server") + log.Infof(newCtx, "creating tenant server") - // Now start the tenant proper. - tenantServer, err := newSharedProcessTenantServer(startCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) - if err != nil { - return nil, err - } - - if err := tenantServer.Start(startCtx); err != nil { - return tenantServer, err - } - - // Show the tenant details in logs. - // TODO(knz): Remove this once we can use a single listener. - if err := tenantServer.reportTenantInfo(startCtx); err != nil { - return tenantServer, err - } - - return tenantServer, nil + // Now instantiate the tenant server proper. + return newSharedProcessTenantServer(newCtx, stopper, baseCfg, sqlCfg, tenantNameContainer) } func (s *Server) makeSharedProcessTenantConfig( diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index a5459a853502..33ddce3f2905 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -276,7 +276,7 @@ func (c *serverController) startControlledServer( ctx := tenantCtx // We want a context that gets cancelled when the tenant is // shutting down, for the possible few cases in - // startServerInternal which are not looking at the + // newServerInternal/preStart/acceptClients which are not looking at the // tenant.ShouldQuiesce() channel but are sensitive to context // cancellation. var cancel func() @@ -291,8 +291,18 @@ func (c *serverController) startControlledServer( var s onDemandServer for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { - var err error - s, err = c.startServerInternal(ctx, entry.nameContainer, tenantStopper) + err := func() error { + var err error + s, err = c.newServerInternal(ctx, entry.nameContainer, tenantStopper) + if err != nil { + return err + } + startCtx := s.annotateCtx(context.Background()) + if err := s.preStart(startCtx); err != nil { + return err + } + return s.acceptClients(startCtx) + }() if err != nil { c.logStartEvent(ctx, roachpb.TenantID{}, 0, entry.nameContainer.Get(), false /* success */, err) @@ -406,7 +416,7 @@ func (c *serverController) startAndWaitForRunningServer( } } -func (c *serverController) startServerInternal( +func (c *serverController) newServerInternal( ctx context.Context, nameContainer *roachpb.TenantNameContainer, tenantStopper *stop.Stopper, ) (onDemandServer, error) { tenantName := nameContainer.Get() From 7d55c39786cf09457ae8c59ffb024f311542f10b Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 20:05:00 +0200 Subject: [PATCH 08/22] server: fix the tenant server error handling Prior to this patch, if an error occurred during the initialization or startup of a secondary tenant server, the initialization would leak state into the stopper defined for that tenant. Generally, reusing a stopper across server startup failures is not safe (and API violation). This patch fixes it by decoupling the intermediate stopper used for orchestration from the one used per tenant server. Release note: None --- pkg/server/server_controller_orchestration.go | 97 +++++++++++++------ 1 file changed, 68 insertions(+), 29 deletions(-) diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 33ddce3f2905..b1f3c7ecac8a 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -210,40 +210,45 @@ func (c *serverController) startControlledServer( } topCtx := ctx + // Use a different context for the tasks below, because the tenant // stopper will have its own tracer which is incompatible with the // tracer attached to the incoming context. + tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) - tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil) - - tenantStopper := stop.NewStopper() + // ctlStopper is a stopper uniquely responsible for the control + // loop. It is separate from the tenantStopper defined below so + // that we can retry the server instantiation if it fails. + ctlStopper := stop.NewStopper() // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { select { - case <-tenantStopper.ShouldQuiesce(): - // Tenant is terminating on their own; nothing else to do here. + case <-stoppedCh: + // Server control loop is terminating prematurely before a + // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + ctlStopper.Stop(tenantCtx) case <-stopRequestCh: // Someone requested a shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + ctlStopper.Stop(tenantCtx) case <-topCtx.Done(): // Someone requested a shutdown. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) - tenantStopper.Stop(tenantCtx) + ctlStopper.Stop(tenantCtx) } }); err != nil { - // The goroutine above is responsible for stopping the - // tenantStopper. If it fails to stop, we stop it here - // to avoid leaking the stopper. - tenantStopper.Stop(ctx) + // The goroutine above is responsible for stopping the ctlStopper. + // If it fails to stop, we stop it here to avoid leaking a + // stopper. + ctlStopper.Stop(ctx) return nil, err } @@ -260,7 +265,7 @@ func (c *serverController) startControlledServer( entry.state.started.Set(false) close(stoppedCh) if !startedOrStoppedChAlreadyClosed { - entry.state.startErr = errors.New("server stop before start") + entry.state.startErr = errors.New("server stop before successful start") close(startedOrStoppedCh) } @@ -274,36 +279,69 @@ func (c *serverController) startControlledServer( // RunAsyncTask, because this stopper will be assigned its own // different tracer. ctx := tenantCtx - // We want a context that gets cancelled when the tenant is + // We want a context that gets cancelled when the server is // shutting down, for the possible few cases in // newServerInternal/preStart/acceptClients which are not looking at the - // tenant.ShouldQuiesce() channel but are sensitive to context + // tenantStopper.ShouldQuiesce() channel but are sensitive to context // cancellation. var cancel func() - ctx, cancel = tenantStopper.WithCancelOnQuiesce(ctx) + ctx, cancel = ctlStopper.WithCancelOnQuiesce(ctx) defer cancel() // Stop retrying startup/initialization if we are being shut // down early. retryOpts := retry.Options{ - Closer: tenantStopper.ShouldQuiesce(), + Closer: ctlStopper.ShouldQuiesce(), } - var s onDemandServer + // tenantStopper is the stopper specific to one tenant server + // instance. We define a new tenantStopper on every attempt to + // instantiate the tenant server below. It is then linked to + // ctlStopper below once the instantiation and start have + // succeeded. + var tenantStopper *stop.Stopper + + var tenantServer onDemandServer for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { - err := func() error { - var err error - s, err = c.newServerInternal(ctx, entry.nameContainer, tenantStopper) + tenantStopper = stop.NewStopper() + + // Link the controller stopper to this tenant stopper. + if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { + select { + case <-tenantStopper.ShouldQuiesce(): + return + case <-ctlStopper.ShouldQuiesce(): + tenantStopper.Stop(ctx) + case <-c.stopper.ShouldQuiesce(): + tenantStopper.Stop(ctx) + } + }); err != nil { + tenantStopper.Stop(ctx) + return + } + + // Try to create the server. + s, err := func() (onDemandServer, error) { + s, err := c.newServerInternal(ctx, entry.nameContainer, tenantStopper) if err != nil { - return err + return nil, errors.Wrap(err, "while creating server") } - startCtx := s.annotateCtx(context.Background()) + + // Note: we make preStart() below derive from ctx, which is + // cancelled on shutdown of the outer server. This is necessary + // to ensure preStart() properly stops prematurely in that case. + startCtx := s.annotateCtx(ctx) + startCtx = logtags.AddTag(startCtx, "start-server", nil) + log.Infof(startCtx, "starting tenant server") if err := s.preStart(startCtx); err != nil { - return err + return nil, errors.Wrap(err, "while starting server") } - return s.acceptClients(startCtx) + return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") }() if err != nil { + // Creation failed. We stop the tenant stopper here, which also + // takes care of terminating the async task we've just started above. + tenantStopper.Stop(ctx) c.logStartEvent(ctx, roachpb.TenantID{}, 0, entry.nameContainer.Get(), false /* success */, err) log.Warningf(ctx, @@ -312,9 +350,10 @@ func (c *serverController) startControlledServer( entry.state.startErr = err continue } + tenantServer = s break } - if s == nil { + if tenantServer == nil { // Retry loop exited before the server could start. This // indicates that there was an async request to abandon the // server startup. This is OK; just terminate early. The defer @@ -323,14 +362,14 @@ func (c *serverController) startControlledServer( } // Log the start event and ensure the stop event is logged eventually. - tid, iid := s.getTenantID(), s.getInstanceID() + tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID() c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil) tenantStopper.AddCloser(stop.CloserFn(func() { c.logStopEvent(ctx, tid, iid, tenantName) })) // Indicate the server has started. - entry.server = s + entry.server = tenantServer startedOrStoppedChAlreadyClosed = true entry.state.started.Set(true) close(startedOrStoppedCh) @@ -340,7 +379,7 @@ func (c *serverController) startControlledServer( case <-tenantStopper.ShouldQuiesce(): log.Infof(ctx, "tenant %q finishing their own control loop", tenantName) - case shutdownRequest := <-s.shutdownRequested(): + case shutdownRequest := <-tenantServer.shutdownRequested(): log.Infof(ctx, "tenant %q requesting their own shutdown: %v", tenantName, shutdownRequest.ShutdownCause()) // Make the async stop goroutine above pick up the task of shutting down. From f582a264834bdd9e17a50f253286d797e3de13f6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 2 Apr 2023 20:28:57 +0200 Subject: [PATCH 09/22] serverccl: simplify TestServerStartupGuardrails Prior to this patch, the test was not cleaning up its server stopper reliably at the end of each sub-test. This patch fixes it. Release note: None --- pkg/ccl/serverccl/BUILD.bazel | 1 - .../server_startup_guardrails_test.go | 102 ++++++++---------- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index 453438028aea..5a188193e765 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -89,7 +89,6 @@ go_test( "//pkg/util/metric", "//pkg/util/protoutil", "//pkg/util/randutil", - "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/serverccl/server_startup_guardrails_test.go b/pkg/ccl/serverccl/server_startup_guardrails_test.go index eab24665252c..c9ff28afceec 100644 --- a/pkg/ccl/serverccl/server_startup_guardrails_test.go +++ b/pkg/ccl/serverccl/server_startup_guardrails_test.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // TestServerStartupGuardrails ensures that a SQL server will fail to start if @@ -74,69 +74,59 @@ func TestServerStartupGuardrails(t *testing.T) { } for i, test := range tests { - storageSettings := cluster.MakeTestingClusterSettingsWithVersions( - test.storageBinaryVersion, - test.storageBinaryMinSupportedVersion, - false, /* initializeVersion */ - ) + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + defer log.Scope(t).Close(t) - s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // Disable the default test tenant, since we create one explicitly - // below. - DisableDefaultTestTenant: true, - Settings: storageSettings, - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - BinaryVersionOverride: test.storageBinaryVersion, - BootstrapVersionKeyOverride: clusterversion.V22_2, - DisableAutomaticVersionUpgrade: make(chan struct{}), - }, - SQLEvalContext: &eval.TestingKnobs{ - TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey, - }, - }, - }) + storageSettings := cluster.MakeTestingClusterSettingsWithVersions( + test.storageBinaryVersion, + test.storageBinaryMinSupportedVersion, + false, /* initializeVersion */ + ) - tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( - test.tenantBinaryVersion, - test.tenantBinaryMinSupportedVersion, - true, /* initializeVersion */ - ) - - // The tenant will be created with an active version equal to the version - // corresponding to TenantLogicalVersionKey. Tenant creation is expected - // to succeed for all test cases but server creation is expected to succeed - // only if tenantBinaryVersion is at least equal to the version corresponding - // to TenantLogicalVersionKey. - stopper := stop.NewStopper() - tenantServer, err := s.StartTenant(context.Background(), - base.TestTenantArgs{ - Settings: tenantSettings, - TenantID: serverutils.TestTenantID(), - Stopper: stopper, - TestingKnobs: base.TestingKnobs{ + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + // Disable the default test tenant, since we create one explicitly + // below. + DisableDefaultTestTenant: true, + Settings: storageSettings, + Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - BinaryVersionOverride: test.tenantBinaryVersion, + BinaryVersionOverride: test.storageBinaryVersion, + BootstrapVersionKeyOverride: clusterversion.V22_2, DisableAutomaticVersionUpgrade: make(chan struct{}), }, + SQLEvalContext: &eval.TestingKnobs{ + TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey, + }, }, }) + defer s.Stopper().Stop(context.Background()) - if !testutils.IsError(err, test.expErrMatch) { - t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) - } + tenantSettings := cluster.MakeTestingClusterSettingsWithVersions( + test.tenantBinaryVersion, + test.tenantBinaryMinSupportedVersion, + true, /* initializeVersion */ + ) - // Only attempt to stop the tenant if it was started successfully. - if err == nil { - tenantServer.Stopper().Stop(context.Background()) - } else { - // Test - stop the failed SQL server using a custom stopper - // NOTE: This custom stopper should not be required, but is because - // currently, if a SQL server fails to start it will not be cleaned - // up immediately without invoking the custom stopper. This could - // be a problem, and is tracked with #98868. - stopper.Stop(context.Background()) - } - s.Stopper().Stop(context.Background()) + // The tenant will be created with an active version equal to the version + // corresponding to TenantLogicalVersionKey. Tenant creation is expected + // to succeed for all test cases but server creation is expected to succeed + // only if tenantBinaryVersion is at least equal to the version corresponding + // to TenantLogicalVersionKey. + _, err := s.StartTenant(context.Background(), + base.TestTenantArgs{ + Settings: tenantSettings, + TenantID: serverutils.TestTenantID(), + TestingKnobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: test.tenantBinaryVersion, + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + }, + }) + + if !testutils.IsError(err, test.expErrMatch) { + t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) + } + }) } } From 12619f5361ed374fe6c309ef60275b814e590be9 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 29 Mar 2023 19:51:10 +0200 Subject: [PATCH 10/22] server: gracefully shut down secondary tenant servers This change ensures that tenant servers managed by the server controller receive a graceful drain request as part of the graceful drain process of the surrounding KV node. This change, in turn, ensures that SQL clients connected to these secondary tenant servers benefit from the same guarantees (and graceful periods) as clients to the system tenant. Release note: None --- pkg/ccl/serverccl/server_controller_test.go | 34 +++++++ pkg/cli/start.go | 41 ++------- pkg/server/drain.go | 89 ++++++++++++++++++- pkg/server/server.go | 1 + pkg/server/server_controller.go | 22 +++++ pkg/server/server_controller_orchestration.go | 87 ++++++++++++++++-- pkg/server/server_sql.go | 11 ++- pkg/server/testing_knobs.go | 4 + 8 files changed, 245 insertions(+), 44 deletions(-) diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index fa10daea35b5..181f8591f34a 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -593,3 +593,37 @@ func TestServerControllerLoginLogout(t *testing.T) { require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames) require.ElementsMatch(t, []string{"", ""}, cookieValues) } + +func TestServiceShutdownUsesGracefulDrain(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + }) + defer s.Stopper().Stop(ctx) + + drainCh := make(chan struct{}) + + // Start a shared process server. + _, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantName: "hello", + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + RequireGracefulDrain: true, + DrainReportCh: drainCh, + }, + }, + }) + require.NoError(t, err) + + _, err = db.Exec("ALTER TENANT hello STOP SERVICE") + require.NoError(t, err) + + // Wait for the server to shut down. This also asserts that the + // graceful drain has occurred. + <-drainCh +} diff --git a/pkg/cli/start.go b/pkg/cli/start.go index e0873db5693a..032b63630394 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1121,40 +1121,13 @@ func startShutdownAsync( drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil) if shouldDrain { - // Perform a graceful drain. We keep retrying forever, in - // case there are many range leases or some unavailability - // preventing progress. If the operator wants to expedite - // the shutdown, they will need to make it ungraceful - // via a 2nd signal. - var ( - remaining = uint64(math.MaxUint64) - prevRemaining = uint64(math.MaxUint64) - verbose = false - ) - - for ; ; prevRemaining = remaining { - var err error - remaining, _, err = s.Drain(drainCtx, verbose) - if err != nil { - log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err) - break - } - if remaining == 0 { - // No more work to do. - break - } - - // If range lease transfer stalls or the number of - // remaining leases somehow increases, verbosity is set - // to help with troubleshooting. - if remaining >= prevRemaining { - verbose = true - } - - // Avoid a busy wait with high CPU usage if the server replies - // with an incomplete drain too quickly. - time.Sleep(200 * time.Millisecond) - } + // Perform a graceful drain. This function keeps retrying and + // the call might never complete (e.g. due to some + // unavailability preventing progress). This is intentional. If + // the operator wants to expedite the shutdown, they will need + // to make it ungraceful by sending a second signal to the + // process, which will tickle the shortcut in waitForShutdown(). + server.CallDrainServerSide(drainCtx, s.Drain) } stopper.Stop(drainCtx) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index bced1d5312a7..ee4f32533fbd 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -13,6 +13,7 @@ package server import ( "context" "io" + "math" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,6 +105,7 @@ type drainServer struct { grpc *grpcServer sqlServer *SQLServer drainSleepFn func(time.Duration) + serverCtl *serverController kvServer struct { nodeLiveness *liveness.NodeLiveness @@ -306,6 +309,26 @@ func (s *drainServer) runDrain( func (s *drainServer) drainInner( ctx context.Context, reporter func(int, redact.SafeString), verbose bool, ) (err error) { + if s.serverCtl != nil { + // We are on a KV node, with a server controller. + // + // First tell the controller to stop starting new servers. + s.serverCtl.draining.Set(true) + + // Then shut down tenant servers orchestrated from + // this node. + stillRunning := s.serverCtl.drain(ctx) + reporter(stillRunning, "tenant servers") + // If we still have tenant servers, we can't make progress on + // draining SQL clients (on the system tenant) and the KV node, + // because that would block the graceful drain of the tenant + // server(s). + if stillRunning > 0 { + return nil + } + log.Infof(ctx, "all tenant servers stopped") + } + // Drain the SQL layer. // Drains all SQL connections, distributed SQL execution flows, and SQL table leases. if err = s.drainClients(ctx, reporter); err != nil { @@ -399,7 +422,8 @@ func (s *drainServer) drainClients( s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx) // Drain all SQL table leases. This must be done after the pgServer has - // given sessions a chance to finish ongoing work. + // given sessions a chance to finish ongoing work and after the background + // tasks that may issue SQL statements have shut down. s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) // Mark this phase in the logs to clarify the context of any subsequent @@ -433,6 +457,7 @@ func (s *drainServer) drainNode( // No KV subsystem. Nothing to do. return nil } + // Set the node's liveness status to "draining". if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { return err @@ -458,3 +483,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error { } }) } + +// CallDrainServerSide is a reference implementation for a server-side +// function that wishes to shut down a server gracefully via the Drain +// interface. The Drain interface is responsible for notifying clients +// and shutting down systems in a particular order that prevents +// client app disruptions. We generally prefer graceful drains to the +// disorderly shutdown caused by either a process crash or a direct +// call to the stopper's Stop() method. +// +// By default, this code will wait forever for a graceful drain to +// complete. The caller can override this behavior by passing a context +// with a deadline. +// +// For an example client-side implementation (drain client over RPC), +// see the code in pkg/cli/node.go, doDrain(). +func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) { + var ( + prevRemaining = uint64(math.MaxUint64) + verbose = false + ) + + ctx = logtags.AddTag(ctx, "call-graceful-drain", nil) + for { + // Let the caller interrupt the process via context cancellation + // if so desired. + select { + case <-ctx.Done(): + log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err()) + return + default: + } + + remaining, _, err := drainFn(ctx, verbose) + if err != nil { + log.Ops.Errorf(ctx, "graceful drain failed: %v", err) + return + } + if remaining == 0 { + // No more work to do. + log.Ops.Infof(ctx, "graceful drain complete") + return + } + + // If range lease transfer stalls or the number of + // remaining leases somehow increases, verbosity is set + // to help with troubleshooting. + if remaining >= prevRemaining { + verbose = true + } + + // Avoid a busy wait with high CPU usage if the server replies + // with an incomplete drain too quickly. + time.Sleep(200 * time.Millisecond) + + // Remember the remaining work to set the verbose flag in the next + // iteration. + prevRemaining = remaining + } +} + +// ServerSideDrainFn is the interface of the server-side handler for the Drain logic. +type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index 95e6ca4d7256..f8a44844f4db 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemTenantNameContainer, pgPreServer.SendRoutingError, ) + drain.serverCtl = sc // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index 0f398dc307b7..e36aabdffdae 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/redact" ) // onDemandServer represents a server that can be started on demand. @@ -65,6 +66,9 @@ type onDemandServer interface { // shutdownRequested returns the shutdown request channel. shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) } type serverEntry struct { @@ -113,6 +117,10 @@ type serverController struct { // a tenant routing error to the incoming client. sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) + // draining is set when the surrounding server starts draining, and + // prevents further creation of new tenant servers. + draining syncutil.AtomicBool + mu struct { syncutil.Mutex @@ -240,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest { return s.server.sqlServer.ShutdownRequested() } +func (s *tenantServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + ctx = s.server.AnnotateCtx(ctx) + return s.server.Drain(ctx, verbose) +} + // systemServerWrapper implements the onDemandServer interface for Server. // // (We can imagine a future where the SQL service for the system @@ -297,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID { func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest { return nil } + +func (s *systemServerWrapper) gracefulDrain( + ctx context.Context, verbose bool, +) (uint64, redact.RedactableString, error) { + // The controller is not responsible for draining the system tenant. + return 0, "", nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index b1f3c7ecac8a..69d43ee36c0f 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -96,9 +96,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error { select { case <-time.After(watchInterval): case <-c.stopper.ShouldQuiesce(): + // Expedited server shutdown of outer server. + return + } + if c.draining.Get() { + // The outer server has started a graceful drain: stop + // picking up new servers. return } - if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil { log.Warningf(ctx, "cannot update running tenant services: %v", err) } @@ -179,6 +184,9 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, ) (*serverEntry, error) { + if c.draining.Get() { + return nil, errors.New("server is draining") + } entry, err := c.startControlledServer(ctx, tenantName) if err != nil { return nil, err @@ -216,12 +224,17 @@ func (c *serverController) startControlledServer( // tracer attached to the incoming context. tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) // ctlStopper is a stopper uniquely responsible for the control // loop. It is separate from the tenantStopper defined below so // that we can retry the server instantiation if it fails. ctlStopper := stop.NewStopper() + // useGracefulDrainDuringTenantShutdown defined whether a graceful + // drain is requested on the tenant server by orchestration. + useGracefulDrainDuringTenantShutdown := make(chan bool, 1) + // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { @@ -230,18 +243,30 @@ func (c *serverController) startControlledServer( // Server control loop is terminating prematurely before a // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) + case <-stopRequestCh: - // Someone requested a shutdown. + // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- true ctlStopper.Stop(tenantCtx) + case <-topCtx.Done(): - // Someone requested a shutdown. + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false ctlStopper.Stop(tenantCtx) } }); err != nil { @@ -309,10 +334,36 @@ func (c *serverController) startControlledServer( if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { select { case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. return case <-ctlStopper.ShouldQuiesce(): + select { + case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + default: + } tenantStopper.Stop(ctx) case <-c.stopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. tenantStopper.Stop(ctx) } }); err != nil { @@ -473,6 +524,30 @@ func (c *serverController) newServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { + entries := c.requestStopAll() + + // Wait for shutdown for all servers. + for _, e := range entries { + <-e.state.stopped + } +} + +func (c *serverController) drain(ctx context.Context) (stillRunning int) { + entries := c.requestStopAll() + // How many entries are _not_ stopped yet? + notStopped := 0 + for _, e := range entries { + select { + case <-e.state.stopped: + default: + log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + notStopped++ + } + } + return notStopped +} + +func (c *serverController) requestStopAll() []*serverEntry { entries := func() (res []*serverEntry) { c.mu.Lock() defer c.mu.Unlock() @@ -487,11 +562,7 @@ func (c *serverController) Close() { for _, e := range entries { e.state.requestStop() } - - // Wait for shutdown for all servers. - for _, e := range entries { - <-e.state.stopped - } + return entries } type nodeEventLogger interface { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d07838f2e8cd..0083e4fb14ca 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1681,15 +1681,24 @@ func (s *SQLServer) preStart( // shutdown; but may be a sign of a problem in production or for // tests that need to restart a server. stopper.AddCloser(stop.CloserFn(func() { + var sk *TestingKnobs + if knobs.Server != nil { + sk, _ = knobs.Server.(*TestingKnobs) + } + if !s.gracefulDrainComplete.Get() { warnCtx := s.AnnotateCtx(context.Background()) - if knobs.Server != nil && knobs.Server.(*TestingKnobs).RequireGracefulDrain { + if sk != nil && sk.RequireGracefulDrain { log.Fatalf(warnCtx, "drain required but not performed") } log.Warningf(warnCtx, "server shutdown without a prior graceful drain") } + + if sk != nil && sk.DrainReportCh != nil { + sk.DrainReportCh <- struct{}{} + } })) return nil diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 24516c0fca2e..9b5b094e7fb5 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -138,6 +138,10 @@ type TestingKnobs struct { // RequireGracefulDrain, if set, causes a shutdown to fail with a log.Fatal // if the server is not gracefully drained prior to its stopper shutting down. RequireGracefulDrain bool + + // DrainReportCh, if set, is a channel that will be notified when + // the SQL service shuts down. + DrainReportCh chan struct{} } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. From 1651e396d3effef203ac4a9b9454e39e7bda70fd Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 12:24:28 +0200 Subject: [PATCH 11/22] server: add some shutdown logging This helps while troubleshooting tests. Release note: None --- pkg/server/drain.go | 2 ++ pkg/server/server_controller_orchestration.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/server/drain.go b/pkg/server/drain.go index ee4f32533fbd..97eaa1c3d955 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -334,6 +334,8 @@ func (s *drainServer) drainInner( if err = s.drainClients(ctx, reporter); err != nil { return err } + log.Infof(ctx, "done draining clients") + // Mark the node as draining in liveness and drain all range leases. return s.drainNode(ctx, reporter, verbose) } diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 69d43ee36c0f..669c86c1596e 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -238,6 +238,7 @@ func (c *serverController) startControlledServer( // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-close task terminating") select { case <-stoppedCh: // Server control loop is terminating prematurely before a @@ -332,6 +333,7 @@ func (c *serverController) startControlledServer( // Link the controller stopper to this tenant stopper. if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-close-tenant task terminating") select { case <-tenantStopper.ShouldQuiesce(): // Tenant server shutting down on its own. From bd7d7d1b085b3bacc15db0c50646798efd3152e5 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 12:30:48 +0200 Subject: [PATCH 12/22] serverccl: add additional logging to `TestServerStartStop` This also ensures the test fails quickly if there is a deadlock while the server is shutting down. (This makes the timeout for this test shorter than the standard timeout for the stopper.Stop method, which is 15 minutes.) Release note: None --- pkg/ccl/serverccl/server_controller_test.go | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/pkg/ccl/serverccl/server_controller_test.go b/pkg/ccl/serverccl/server_controller_test.go index 181f8591f34a..665493deca7a 100644 --- a/pkg/ccl/serverccl/server_controller_test.go +++ b/pkg/ccl/serverccl/server_controller_test.go @@ -469,6 +469,8 @@ func TestServerStartStop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "test sensitive to low timeout") + ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ @@ -517,6 +519,36 @@ func TestServerStartStop(t *testing.T) { } return errors.New("server still alive") }) + + log.Infof(ctx, "end of test - test server will now shut down ungracefully") + + // Monitor the state of the test server stopper. We use this logging + // to troubleshoot slow drains. + done := make(chan struct{}) + go func() { + for { + select { + case <-done: + return + case <-time.After(200 * time.Millisecond): + select { + case <-s.Stopper().ShouldQuiesce(): + log.Infof(ctx, "test server is quiescing") + case <-s.Stopper().IsStopped(): + log.Infof(ctx, "test server is stopped") + return + default: + } + } + } + }() + defer func() { close(done) }() + + defer time.AfterFunc(10*time.Second, func() { + log.DumpStacks(ctx, "slow quiesce") + log.Fatalf(ctx, "test took too long to shut down") + }).Stop() + s.Stopper().Stop(ctx) } func TestServerControllerLoginLogout(t *testing.T) { From 34f9187ece978d4892a165084e0a95dd61fde02e Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 00:24:07 +0200 Subject: [PATCH 13/22] server: lift the orchestration code under an interface This commit extracts the core logic from `server_controller_orchestration.go` into an interface (`serverOrchestrator`) that can be mocked. We will use this in testing. This extraction is also useful because it exposes a few shortcomings in the current implementation (wrt server shutdown). This makes it easier to fix them in a later commit. Release note: None --- pkg/server/BUILD.bazel | 2 + pkg/server/server_controller.go | 81 +----- pkg/server/server_controller_accessors.go | 13 +- .../server_controller_channel_orchestrator.go | 165 ++++++++++++ pkg/server/server_controller_orchestration.go | 243 ++++++++++-------- .../server_controller_orchestration_method.go | 154 +++++++++++ pkg/server/server_controller_sql.go | 2 +- 7 files changed, 466 insertions(+), 194 deletions(-) create mode 100644 pkg/server/server_controller_channel_orchestrator.go create mode 100644 pkg/server/server_controller_orchestration_method.go diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index e48a722f34f5..45eb71bcfb85 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -52,9 +52,11 @@ go_library( "server.go", "server_controller.go", "server_controller_accessors.go", + "server_controller_channel_orchestrator.go", "server_controller_http.go", "server_controller_new_server.go", "server_controller_orchestration.go", + "server_controller_orchestration_method.go", "server_controller_sql.go", "server_http.go", "server_obs_service.go", diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index e36aabdffdae..4d30ce47e01f 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -29,18 +29,7 @@ import ( // onDemandServer represents a server that can be started on demand. type onDemandServer interface { - // preStart activates background tasks and initializes subsystems - // but does not yet accept incoming connections. - preStart(context.Context) error - - // acceptClients starts accepting incoming connections. - acceptClients(context.Context) error - - // stop stops this server. - stop(context.Context) - - // annotateCtx annotates the context with server-specific logging tags. - annotateCtx(context.Context) context.Context + orchestratedServer // getHTTPHandlerFn retrieves the function that can serve HTTP // requests for this server. @@ -57,36 +46,6 @@ type onDemandServer interface { // getRPCAddr() returns the RPC address for this server. getRPCAddr() string - - // getTenantID returns the TenantID for this server. - getTenantID() roachpb.TenantID - - // getInstanceID returns the SQLInstanceID for this server. - getInstanceID() base.SQLInstanceID - - // shutdownRequested returns the shutdown request channel. - shutdownRequested() <-chan ShutdownRequest - - // gracefulDrain drains the server. - gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) -} - -type serverEntry struct { - // server is the actual server. - // This is only defined once state.started is true. - server onDemandServer - - // nameContainer holds a shared reference to the current - // name of the tenant within this serverEntry. If the - // tenant's name is updated, the `Set` method on - // nameContainer should be called in order to update - // any subscribers within the tenant. These are typically - // observability-related features that label data with - // the current tenant name. - nameContainer *roachpb.TenantNameContainer - - // state coordinates the server's lifecycle. - state serverState } // serverController manages a fleet of multiple servers side-by-side. @@ -121,6 +80,9 @@ type serverController struct { // prevents further creation of new tenant servers. draining syncutil.AtomicBool + // orchestrator is the orchestration method to use. + orchestrator serverOrchestrator + mu struct { syncutil.Mutex @@ -128,7 +90,7 @@ type serverController struct { // // TODO(knz): Detect when the mapping of name to tenant ID has // changed, and invalidate the entry. - servers map[roachpb.TenantName]*serverEntry + servers map[roachpb.TenantName]serverState // nextServerIdx is the index to provide to the next call to // newServerFn. @@ -156,27 +118,9 @@ func newServerController( tenantServerCreator: tenantServerCreator, sendSQLRoutingError: sendSQLRoutingError, } - - // We make the serverState for the system mock the regular - // lifecycle. It starts with an already-closed `startedOrStopped` - // channel; and it properly reacts to a call to requestStop() - // by closing its stopped channel -- albeit with no other side effects. - closedChan := make(chan struct{}) - close(closedChan) - closeCtx, cancelFn := context.WithCancel(context.Background()) - entry := &serverEntry{ - server: systemServer, - nameContainer: systemTenantNameContainer, - state: serverState{ - startedOrStopped: closedChan, - requestStop: cancelFn, - stopped: closeCtx.Done(), - }, - } - entry.state.started.Set(true) - - c.mu.servers = map[roachpb.TenantName]*serverEntry{ - catconstants.SystemTenantName: entry, + c.orchestrator = newServerOrchestrator(parentStopper, c) + c.mu.servers = map[roachpb.TenantName]serverState{ + catconstants.SystemTenantName: c.orchestrator.makeServerStateForSystemTenant(systemTenantNameContainer, systemServer), } parentStopper.AddCloser(c) return c @@ -219,11 +163,6 @@ func (t *tenantServerWrapper) acceptClients(ctx context.Context) error { return t.server.reportTenantInfo(ctx) } -func (t *tenantServerWrapper) stop(ctx context.Context) { - ctx = t.server.AnnotateCtx(ctx) - t.stopper.Stop(ctx) -} - func (t *tenantServerWrapper) getHTTPHandlerFn() http.HandlerFunc { return t.server.http.baseHandler } @@ -284,10 +223,6 @@ func (s *systemServerWrapper) acceptClients(ctx context.Context) error { return nil } -func (s *systemServerWrapper) stop(ctx context.Context) { - // No-op: the SQL service for the system tenant never shuts down. -} - func (t *systemServerWrapper) getHTTPHandlerFn() http.HandlerFunc { return t.server.http.baseHandler } diff --git a/pkg/server/server_controller_accessors.go b/pkg/server/server_controller_accessors.go index 87334772448c..8ba5863502f5 100644 --- a/pkg/server/server_controller_accessors.go +++ b/pkg/server/server_controller_accessors.go @@ -25,8 +25,8 @@ func (c *serverController) getServer( c.mu.Lock() defer c.mu.Unlock() if e, ok := c.mu.servers[tenantName]; ok { - if e.state.started.Get() { - return e.server, nil + if so, isReady := e.getServer(); isReady { + return so.(onDemandServer), nil } } return nil, errors.Mark(errors.Newf("no server for tenant %q", tenantName), errNoTenantServerRunning) @@ -40,14 +40,15 @@ var errNoTenantServerRunning error = noTenantServerRunning{} // getServers retrieves all the currently instantiated and running // in-memory servers. -func (c *serverController) getServers() (res []*serverEntry) { +func (c *serverController) getServers() (res []onDemandServer) { c.mu.Lock() defer c.mu.Unlock() for _, e := range c.mu.servers { - if !e.state.started.Get() { + so, isReady := e.getServer() + if !isReady { continue } - res = append(res, e) + res = append(res, so.(onDemandServer)) } return res } @@ -59,7 +60,7 @@ func (c *serverController) getCurrentTenantNames() []roachpb.TenantName { c.mu.Lock() defer c.mu.Unlock() for name, e := range c.mu.servers { - if !e.state.started.Get() { + if _, isReady := e.getServer(); !isReady { continue } serverNames = append(serverNames, name) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go new file mode 100644 index 000000000000..dc2fc508bc6c --- /dev/null +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -0,0 +1,165 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// channelOrchestrator is an implementation of serverOrchestrator +// whose serverState uses go channels to coordinate the server's +// lifecycle. +type channelOrchestrator struct { + parentStopper *stop.Stopper + serverFactory serverFactoryForOrchestration +} + +var _ serverOrchestrator = (*channelOrchestrator)(nil) + +func newChannelOrchestrator( + parentStopper *stop.Stopper, serverFactory serverFactoryForOrchestration, +) *channelOrchestrator { + return &channelOrchestrator{ + parentStopper: parentStopper, + serverFactory: serverFactory, + } +} + +// serverStateUsingChannels coordinates the lifecycle of a tenant +// server. It ensures sane concurrent behavior between: +// - requests to start a server manually, e.g. via TestServer; +// - async changes to the tenant service mode; +// - quiescence of the outer stopper; +// - RPC drain requests on the tenant server; +// - server startup errors if any. +// +// Generally, the lifecycle is as follows: +// 1. a request to start a server will cause a serverEntry to be added +// to the server controller, in the state "not yet started". +// 2. the "managed-tenant-server" async task starts, via +// StartControlledServer() +// 3. the async task attempts to start the server (with retries and +// backoff delay as needed), or cancels the startup if +// a request to stop is received asynchronously. +// 4. after the server is started, the async task waits for a shutdown +// request. +// 5. once a shutdown request is received the async task +// stops the server. +// +// The async task is also responsible for reporting the server +// start/stop events in the event log. +type serverStateUsingChannels struct { + // nc holds a shared reference to the current name of the + // tenant. If the tenant's name is updated, the `Set` method on + // nameContainer should be called in order to update any subscribers + // within the tenant. These are typically observability-related + // features that label data with the current tenant name. + nc *roachpb.TenantNameContainer + + // server is the server that is being controlled. + server orchestratedServer + + // startedOrStopped is closed when the server has either started or + // stopped. This can be used to wait for a server start. + startedOrStoppedCh <-chan struct{} + + // startErr, once startedOrStopped is closed, reports the error + // during server creation if any. + startErr error + + // started is marked true when the server has started. This can + // be used to observe the start state without waiting. + started syncutil.AtomicBool + + // requestStop can be called to request a server to stop. + // It can be called multiple times. + requestStop func() + + // stopped is closed when the server has stopped. + stoppedCh <-chan struct{} +} + +var _ serverState = (*serverStateUsingChannels)(nil) + +// getServer is part of the serverState interface. +func (s *serverStateUsingChannels) getServer() (orchestratedServer, bool) { + return s.server, s.started.Get() +} + +// nameContainer is part of the serverState interface. +func (s *serverStateUsingChannels) nameContainer() *roachpb.TenantNameContainer { + return s.nc +} + +// getLastStartupError is part of the serverState interface. +func (s *serverStateUsingChannels) getLastStartupError() error { + return s.startErr +} + +// requestGracefulShutdown is part of the serverState interface. +func (s *serverStateUsingChannels) requestGracefulShutdown(ctx context.Context) { + // TODO(knz): this is the code that was originally implemented, and + // it is incorrect because it does not obey the incoming context's + // cancellation. + s.requestStop() +} + +// requestImmediateShutdown is part of the serverState interface. +func (s *serverStateUsingChannels) requestImmediateShutdown(ctx context.Context) { + // TODO(knz): this is the code that was originally implemented, + // and it is incorrect; this should strigger a stop on + // the tenant stopper. + // + // Luckly, this implementation error happens to be innocuous because + // the only call to reqquestImmediateShutdown() happens after the + // parent stopper quiescence, and the control loop is already + // sensitive to that. + // + // We should refactor this to remove the potential for confusion. + s.requestStop() +} + +// stopped is part of the serverState interface. +func (s *serverStateUsingChannels) stopped() <-chan struct{} { + return s.stoppedCh +} + +// startedOrStopped is part of the serverState interface. +func (s *serverStateUsingChannels) startedOrStopped() <-chan struct{} { + return s.startedOrStoppedCh +} + +// makeServerStateForSystemTenant is part of the orchestrator interface. +func (o *channelOrchestrator) makeServerStateForSystemTenant( + nc *roachpb.TenantNameContainer, systemSrv orchestratedServer, +) serverState { + // We make the serverState for the system mock the regular + // lifecycle. It starts with an already-closed `startedOrStopped` + // channel; and it properly reacts to a call to requestStop() + // by closing its stopped channel -- albeit with no other side effects. + closedChan := make(chan struct{}) + close(closedChan) + closeCtx, cancelFn := context.WithCancel(context.Background()) + st := &serverStateUsingChannels{ + nc: nc, + server: systemSrv, + startedOrStoppedCh: closedChan, + requestStop: cancelFn, + stoppedCh: closeCtx.Done(), + } + + st.started.Set(true) + return st +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 669c86c1596e..241aa20a409c 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -33,50 +33,6 @@ import ( "github.com/cockroachdb/redact" ) -// serverState coordinates the lifecycle of a tenant server. It ensures -// sane concurrent behavior between: -// - requests to start a server manually, e.g. via TestServer; -// - async changes to the tenant service mode; -// - quiescence of the outer stopper; -// - RPC drain requests on the tenant server; -// - server startup errors if any. -// -// Generally, the lifecycle is as follows: -// 1. a request to start a server will cause a serverEntry to be added -// to the server controller, in the state "not yet started". -// 2. the "managed-tenant-server" async task starts, via -// StartControlledServer() -// 3. the async task attempts to start the server (with retries and -// backoff delay as needed), or cancels the startup if -// a request to stop is received asynchronously. -// 4. after the server is started, the async task waits for a shutdown -// request. -// 5. once a shutdown request is received the async task -// stops the server. -// -// The async task is also responsible for reporting the server -// start/stop events in the event log. -type serverState struct { - // startedOrStopped is closed when the server has either started or - // stopped. This can be used to wait for a server start. - startedOrStopped <-chan struct{} - - // startErr, once startedOrStopped is closed, reports the error - // during server creation if any. - startErr error - - // started is marked true when the server has started. This can - // be used to observe the start state without waiting. - started syncutil.AtomicBool - - // requestStop can be called to request a server to stop. - // It can be called multiple times. - requestStop func() - - // stopped is closed when the server has stopped. - stopped <-chan struct{} -} - // start monitors changes to the service mode and updates // the running servers accordingly. func (c *serverController) start(ctx context.Context, ie isql.Executor) error { @@ -164,7 +120,7 @@ func (c *serverController) scanTenantsForRunnableServices( if _, ok := nameLookup[name]; !ok { log.Infof(ctx, "tenant %q has changed service mode, should now stop", name) // Mark the server for async shutdown. - srv.state.requestStop() + srv.requestGracefulShutdown(ctx) } } @@ -183,11 +139,37 @@ func (c *serverController) scanTenantsForRunnableServices( func (c *serverController) createServerEntryLocked( ctx context.Context, tenantName roachpb.TenantName, -) (*serverEntry, error) { +) (serverState, error) { if c.draining.Get() { return nil, errors.New("server is draining") } - entry, err := c.startControlledServer(ctx, tenantName) + + // finalizeFn is called when the server is fully stopped. + // It is called from a different goroutine than the caller of + // startControlledServer, and so needs to lock c.mu itself. + finalizeFn := func(ctx context.Context, tenantName roachpb.TenantName) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.mu.servers, tenantName) + } + // startErrorFn is called every time there is an error starting + // the server. + startErrorFn := func(ctx context.Context, tenantName roachpb.TenantName, err error) { + c.logStartEvent(ctx, roachpb.TenantID{}, 0, tenantName, false /* success */, err) + } + // serverStartedFn is called when the server has started + // successfully and is accepting clients. + serverStartedFn := func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID) { + c.logStartEvent(ctx, tid, sid, tenantName, true /* success */, nil) + } + // serverStoppingFn is called when the server is shutting down + // after a successful start. + serverStoppingFn := func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID) { + c.logStopEvent(ctx, tid, sid, tenantName) + } + + entry, err := c.orchestrator.startControlledServer(ctx, tenantName, + finalizeFn, startErrorFn, serverStartedFn, serverStoppingFn) if err != nil { return nil, err } @@ -195,26 +177,43 @@ func (c *serverController) createServerEntryLocked( return entry, nil } -// startControlledServer starts the orchestration task that starts, -// then shuts down, the server for the given tenant. -func (c *serverController) startControlledServer( - ctx context.Context, tenantName roachpb.TenantName, -) (*serverEntry, error) { +// startControlledServer is part of the serverOrchestrator interface. +func (o *channelOrchestrator) startControlledServer( + ctx context.Context, + // tenantName is the name of the tenant for which a server should + // be created. + tenantName roachpb.TenantName, + // finalizeFn is called when the server is fully stopped. + // This is always called, even if there is a server startup error. + finalizeFn func(ctx context.Context, tenantName roachpb.TenantName), + // startErrorFn is called every time there is an error starting + // the server. Suggested use is for logging. To synchronize on the + // server's state, use the resulting serverState instead. + startErrorFn func(ctx context.Context, tenantName roachpb.TenantName, err error), + // serverStartedFn is called when the server has started + // successfully and is accepting clients. Suggested use is for + // logging. To synchronize on the server's state, use the + // resulting serverState instead. + startCompleteFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), + // serverStoppingFn is called when the server is shutting down + // after a successful start. Suggested use is for logging. To + // synchronize on the server's state, use the resulting + // serverState instead. + serverStoppingFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), +) (serverState, error) { var stoppedChClosed syncutil.AtomicBool stopRequestCh := make(chan struct{}) stoppedCh := make(chan struct{}) startedOrStoppedCh := make(chan struct{}) - entry := &serverEntry{ - nameContainer: roachpb.NewTenantNameContainer(tenantName), - state: serverState{ - startedOrStopped: startedOrStoppedCh, - requestStop: func() { - if !stoppedChClosed.Swap(true) { - close(stopRequestCh) - } - }, - stopped: stoppedCh, + state := &serverStateUsingChannels{ + nc: roachpb.NewTenantNameContainer(tenantName), + startedOrStoppedCh: startedOrStoppedCh, + requestStop: func() { + if !stoppedChClosed.Swap(true) { + close(stopRequestCh) + } }, + stoppedCh: stoppedCh, } topCtx := ctx @@ -237,7 +236,7 @@ func (c *serverController) startControlledServer( // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. - if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { + if err := o.parentStopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { defer log.Infof(ctx, "propagate-close task terminating") select { case <-stoppedCh: @@ -245,7 +244,7 @@ func (c *serverController) startControlledServer( // request was made to terminate it. log.Infof(ctx, "tenant %q terminating", tenantName) - case <-c.stopper.ShouldQuiesce(): + case <-o.parentStopper.ShouldQuiesce(): // Surrounding server is stopping; propagate the stop to the // control goroutine below. // Note: we can't do a graceful drain in that case because @@ -278,7 +277,7 @@ func (c *serverController) startControlledServer( return nil, err } - if err := c.stopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) { + if err := o.parentStopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) { startedOrStoppedChAlreadyClosed := false defer func() { // We may be returning early due to an error in the server initialization @@ -287,18 +286,18 @@ func (c *serverController) startControlledServer( // and we could get a goroutine leak for the above task. // To prevent this, we call requestStop() which tells the goroutine above // to call tenantStopper.Stop() and terminate. - entry.state.requestStop() - entry.state.started.Set(false) + state.requestStop() + state.started.Set(false) close(stoppedCh) if !startedOrStoppedChAlreadyClosed { - entry.state.startErr = errors.New("server stop before successful start") + state.startErr = errors.New("server stop before successful start") close(startedOrStoppedCh) } - // Remove the entry from the server map. - c.mu.Lock() - defer c.mu.Unlock() - delete(c.mu.servers, tenantName) + // Call the finalizer. + if finalizeFn != nil { + finalizeFn(ctx, tenantName) + } }() // We use our detached tenantCtx, the incoming ctx given by @@ -327,7 +326,7 @@ func (c *serverController) startControlledServer( // succeeded. var tenantStopper *stop.Stopper - var tenantServer onDemandServer + var tenantServer orchestratedServer for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { tenantStopper = stop.NewStopper() @@ -352,7 +351,7 @@ func (c *serverController) startControlledServer( // latter has been linked to ctlStopper.Quiesce already // -- and in this select branch that context has been // canceled already. - drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx) + drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) defer cancel() log.Infof(drainCtx, "starting graceful drain") // Call the drain service on that tenant's server. This may take a @@ -364,7 +363,7 @@ func (c *serverController) startControlledServer( default: } tenantStopper.Stop(ctx) - case <-c.stopper.ShouldQuiesce(): + case <-o.parentStopper.ShouldQuiesce(): // Expedited shutdown of the surrounding KV node. tenantStopper.Stop(ctx) } @@ -374,8 +373,8 @@ func (c *serverController) startControlledServer( } // Try to create the server. - s, err := func() (onDemandServer, error) { - s, err := c.newServerInternal(ctx, entry.nameContainer, tenantStopper) + s, err := func() (orchestratedServer, error) { + s, err := o.serverFactory.newServerForOrchestrator(ctx, state.nc, tenantStopper) if err != nil { return nil, errors.Wrap(err, "while creating server") } @@ -395,12 +394,13 @@ func (c *serverController) startControlledServer( // Creation failed. We stop the tenant stopper here, which also // takes care of terminating the async task we've just started above. tenantStopper.Stop(ctx) - c.logStartEvent(ctx, roachpb.TenantID{}, 0, - entry.nameContainer.Get(), false /* success */, err) + if startErrorFn != nil { + startErrorFn(ctx, tenantName, err) + } log.Warningf(ctx, "unable to start server for tenant %q (attempt %d, will retry): %v", tenantName, retry.CurrentAttempt(), err) - entry.state.startErr = err + state.startErr = err continue } tenantServer = s @@ -416,15 +416,19 @@ func (c *serverController) startControlledServer( // Log the start event and ensure the stop event is logged eventually. tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID() - c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil) - tenantStopper.AddCloser(stop.CloserFn(func() { - c.logStopEvent(ctx, tid, iid, tenantName) - })) + if startCompleteFn != nil { + startCompleteFn(ctx, tenantName, tid, iid) + } + if serverStoppingFn != nil { + tenantStopper.AddCloser(stop.CloserFn(func() { + serverStoppingFn(ctx, tenantName, tid, iid) + })) + } // Indicate the server has started. - entry.server = tenantServer + state.server = tenantServer startedOrStoppedChAlreadyClosed = true - entry.state.started.Set(true) + state.started.Set(true) close(startedOrStoppedCh) // Wait for a request to shut down. @@ -436,15 +440,15 @@ func (c *serverController) startControlledServer( log.Infof(ctx, "tenant %q requesting their own shutdown: %v", tenantName, shutdownRequest.ShutdownCause()) // Make the async stop goroutine above pick up the task of shutting down. - entry.state.requestStop() + state.requestStop() } }); err != nil { // Clean up the task we just started before. - entry.state.requestStop() + state.requestStop() return nil, err } - return entry, nil + return state, nil } // getExpectedRunningTenants retrieves the tenant IDs that should @@ -486,7 +490,7 @@ ORDER BY name`, mtinfopb.ServiceModeShared, mtinfopb.DataStateReady) func (c *serverController) startAndWaitForRunningServer( ctx context.Context, tenantName roachpb.TenantName, ) (onDemandServer, error) { - entry, err := func() (*serverEntry, error) { + entry, err := func() (serverState, error) { c.mu.Lock() defer c.mu.Unlock() if entry, ok := c.mu.servers[tenantName]; ok { @@ -499,8 +503,12 @@ func (c *serverController) startAndWaitForRunningServer( } select { - case <-entry.state.startedOrStopped: - return entry.server, entry.state.startErr + case <-entry.startedOrStopped(): + s, isReady := entry.getServer() + if isReady { + return s.(onDemandServer), nil + } + return nil, entry.getLastStartupError() case <-c.stopper.ShouldQuiesce(): return nil, errors.New("server stopping") case <-ctx.Done(): @@ -508,9 +516,11 @@ func (c *serverController) startAndWaitForRunningServer( } } -func (c *serverController) newServerInternal( +// newServerForOrchestrator implements the +// serverFactoryForOrchestration interface. +func (c *serverController) newServerForOrchestrator( ctx context.Context, nameContainer *roachpb.TenantNameContainer, tenantStopper *stop.Stopper, -) (onDemandServer, error) { +) (orchestratedServer, error) { tenantName := nameContainer.Get() testArgs := c.testArgs[tenantName] @@ -526,45 +536,50 @@ func (c *serverController) newServerInternal( // Close implements the stop.Closer interface. func (c *serverController) Close() { - entries := c.requestStopAll() + // Note Close() is only called in the case of expedited shutdown. + // It should not invoke the graceful drain process. + entries := c.getAllEntries() + // Request immediate shutdown. This is probably not needed; the + // server should already be sensitive to the parent stopper + // quiescing. + for _, e := range entries { + e.requestImmediateShutdown(context.Background()) + } // Wait for shutdown for all servers. for _, e := range entries { - <-e.state.stopped + <-e.stopped() } } func (c *serverController) drain(ctx context.Context) (stillRunning int) { - entries := c.requestStopAll() + entries := c.getAllEntries() + // Request shutdown for all servers. + for _, e := range entries { + e.requestGracefulShutdown(ctx) + } + // How many entries are _not_ stopped yet? notStopped := 0 for _, e := range entries { select { - case <-e.state.stopped: + case <-e.stopped(): default: - log.Infof(ctx, "server for tenant %q still running", e.nameContainer) + log.Infof(ctx, "server for tenant %q still running", e.nameContainer()) notStopped++ } } return notStopped } -func (c *serverController) requestStopAll() []*serverEntry { - entries := func() (res []*serverEntry) { - c.mu.Lock() - defer c.mu.Unlock() - res = make([]*serverEntry, 0, len(c.mu.servers)) - for _, e := range c.mu.servers { - res = append(res, e) - } - return res - }() - - // Request shutdown for all servers. - for _, e := range entries { - e.state.requestStop() +func (c *serverController) getAllEntries() (res []serverState) { + c.mu.Lock() + defer c.mu.Unlock() + res = make([]serverState, 0, len(c.mu.servers)) + for _, e := range c.mu.servers { + res = append(res, e) } - return entries + return res } type nodeEventLogger interface { diff --git a/pkg/server/server_controller_orchestration_method.go b/pkg/server/server_controller_orchestration_method.go new file mode 100644 index 000000000000..75453f198eab --- /dev/null +++ b/pkg/server/server_controller_orchestration_method.go @@ -0,0 +1,154 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/redact" +) + +// serverState is the interface that the server controller uses to +// interact with one of the orchestration methods. +type serverState interface { + // requestGracefulShutdown requests a graceful shutdown of the + // server, via the drain logic. The function returns immediately. + // The drain is performed in the background. To wait on actual + // server shutdown, the caller should also wait on the stopped() + // channel. + // + // We are generally assuming the graceful drain will eventually + // succeed, although it may take a while due to various + // user-configurable timeouts. The caller can forcefully cancel this + // by either cancelling the context argument; or by quiescing the + // stopper that this serverState was instantiated from. + // (There is not need to make the context argument hang off + // the stopper using WithCancelOnDrain.) + // + // Beware that the context passed as argument should remain active + // for as long as the (background) shutdown takes. The caller should + // not cancel it immediately. + requestGracefulShutdown(ctx context.Context) + + // requestImmediateShutdown requests an immediate (and ungraceful) + // shutdown of the server. The function returns immediately, + // possibly before all server tasks are terminated. The caller can + // wait on the actual shutdown by waiting on the stopped() channel. + requestImmediateShutdown(ctx context.Context) + + // stopped returns a channel that is closed when the server is fully stopped + // (no tasks remaining). + stopped() <-chan struct{} + + // startedOrStopped returns a channel that is closed when the server + // is fully started or fully stopped. + startedOrStopped() <-chan struct{} + + // getServer retrieves the orchestrated server and whether the server is ready. + // + // The first return value is only guaranteed to be non-nil when the isReady value is true. + getServer() (server orchestratedServer, isReady bool) + + // getLastStartupError returns the last known startup error. + getLastStartupError() error + + // nameContainer returns a shared reference to the current name of the + // tenant. If the tenant's name is updated, the `Set` method on + // nameContainer should be called in order to update any subscribers + // within the tenant. These are typically observability-related + // features that label data with the current tenant name. + nameContainer() *roachpb.TenantNameContainer +} + +// orchestratedServer is the subset of the onDemandServer interface +// that is sufficient to orchestrate the lifecycle of a server. +type orchestratedServer interface { + // annotateCtx annotates the context with server-specific logging tags. + annotateCtx(context.Context) context.Context + + // preStart activates background tasks and initializes subsystems + // but does not yet accept incoming connections. + // Graceful drain becomes possible after preStart() returns. + // Note that there may be background tasks remaining even if preStart + // returns an error. + preStart(context.Context) error + + // acceptClients starts accepting incoming connections. + acceptClients(context.Context) error + + // shutdownRequested returns the shutdown request channel, + // which is triggered when the server encounters an internal + // condition or receives an external RPC that requires it to shut down. + shutdownRequested() <-chan ShutdownRequest + + // gracefulDrain drains the server. It should be called repeatedly + // until the first value reaches zero. + gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error) + + // getTenantID returns the tenant ID. + getTenantID() roachpb.TenantID + + // getInstanceID returns the instance ID. This is not well-defined + // until preStart() returns successfully. + getInstanceID() base.SQLInstanceID +} + +// serverOrchestrator abstracts over the orchestration method for tenant +// servers. This interface allows us to decouple the orchestration +// logic from the implementation of server controller. +type serverOrchestrator interface { + // makeServerStateForSystemTenant returns a serverState for the + // system tenant. + makeServerStateForSystemTenant(nc *roachpb.TenantNameContainer, systemSrv orchestratedServer) serverState + + // startControlledServer starts a tenant server. + startControlledServer(ctx context.Context, + // tenantName is the name of the tenant for which a server should + // be created. + tenantName roachpb.TenantName, + // finalizeFn is called when the server is fully stopped. + // This is always called, even if there is a server startup error. + // It is also called asynchronously, possibly after + // startControlledServer has returned. + finalizeFn func(ctx context.Context, tenantName roachpb.TenantName), + // startErrorFn is called every time there is an error starting + // the server. Suggested use is for logging. To synchronize on the + // server's state, use the resulting serverState instead. + startErrorFn func(ctx context.Context, tenantName roachpb.TenantName, err error), + // serverStartedFn is called when the server has started + // successfully and is accepting clients. Suggested use is for + // logging. To synchronize on the server's state, use the + // resulting serverState instead. + serverStartedFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), + // serverStoppingFn is called when the server is shutting down + // after a successful start. Suggested use is for logging. To + // synchronize on the server's state, use the resulting + // serverState instead. + serverStoppingFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), + ) (state serverState, err error) +} + +// serverFactoryForOrchestration provides the method that instantiates tenant servers. +type serverFactoryForOrchestration interface { + // newServerForOrchestrator returns a new tenant server. + newServerForOrchestrator(ctx context.Context, nc *roachpb.TenantNameContainer, tenantStopper *stop.Stopper) (orchestratedServer, error) +} + +// newServerOrchestrator returns the orchestration method to use. +func newServerOrchestrator( + parentStopper *stop.Stopper, serverFactory serverFactoryForOrchestration, +) serverOrchestrator { + // TODO(knz): make this configurable for testing. + return newChannelOrchestrator(parentStopper, serverFactory) +} diff --git a/pkg/server/server_controller_sql.go b/pkg/server/server_controller_sql.go index e120aaf1497a..d7837b81744c 100644 --- a/pkg/server/server_controller_sql.go +++ b/pkg/server/server_controller_sql.go @@ -50,7 +50,7 @@ func (c *serverController) sqlMux( // servers to see and process the cancel at approximately the // same time as every other. if err := c.stopper.RunAsyncTask(ctx, "cancel", func(ctx context.Context) { - s.server.handleCancel(ctx, status.CancelKey) + s.handleCancel(ctx, status.CancelKey) }); err != nil { return err } From 67737d51e64e17898a924655f99254d90915c415 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 00:26:20 +0200 Subject: [PATCH 14/22] server: move the main tenant orchestration logic to its own file This moves `(o *channelOrchestrator) startControlledServer` alongside the other methods of `channelOrchestrator`. There is further no code change. Release note: None --- .../server_controller_channel_orchestrator.go | 279 ++++++++++++++++++ pkg/server/server_controller_orchestration.go | 277 ----------------- 2 files changed, 279 insertions(+), 277 deletions(-) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index dc2fc508bc6c..4d48dcd21783 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -13,9 +13,14 @@ package server import ( "context" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" ) // channelOrchestrator is an implementation of serverOrchestrator @@ -163,3 +168,277 @@ func (o *channelOrchestrator) makeServerStateForSystemTenant( st.started.Set(true) return st } + +// startControlledServer is part of the serverOrchestrator interface. +func (o *channelOrchestrator) startControlledServer( + ctx context.Context, + // tenantName is the name of the tenant for which a server should + // be created. + tenantName roachpb.TenantName, + // finalizeFn is called when the server is fully stopped. + // This is always called, even if there is a server startup error. + finalizeFn func(ctx context.Context, tenantName roachpb.TenantName), + // startErrorFn is called every time there is an error starting + // the server. Suggested use is for logging. To synchronize on the + // server's state, use the resulting serverState instead. + startErrorFn func(ctx context.Context, tenantName roachpb.TenantName, err error), + // serverStartedFn is called when the server has started + // successfully and is accepting clients. Suggested use is for + // logging. To synchronize on the server's state, use the + // resulting serverState instead. + startCompleteFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), + // serverStoppingFn is called when the server is shutting down + // after a successful start. Suggested use is for logging. To + // synchronize on the server's state, use the resulting + // serverState instead. + serverStoppingFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), +) (serverState, error) { + var stoppedChClosed syncutil.AtomicBool + stopRequestCh := make(chan struct{}) + stoppedCh := make(chan struct{}) + startedOrStoppedCh := make(chan struct{}) + state := &serverStateUsingChannels{ + nc: roachpb.NewTenantNameContainer(tenantName), + startedOrStoppedCh: startedOrStoppedCh, + requestStop: func() { + if !stoppedChClosed.Swap(true) { + close(stopRequestCh) + } + }, + stoppedCh: stoppedCh, + } + + topCtx := ctx + + // Use a different context for the tasks below, because the tenant + // stopper will have its own tracer which is incompatible with the + // tracer attached to the incoming context. + tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) + tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) + + // ctlStopper is a stopper uniquely responsible for the control + // loop. It is separate from the tenantStopper defined below so + // that we can retry the server instantiation if it fails. + ctlStopper := stop.NewStopper() + + // useGracefulDrainDuringTenantShutdown defined whether a graceful + // drain is requested on the tenant server by orchestration. + useGracefulDrainDuringTenantShutdown := make(chan bool, 1) + + // Ensure that if the surrounding server requests shutdown, we + // propagate it to the new server. + if err := o.parentStopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-close task terminating") + select { + case <-stoppedCh: + // Server control loop is terminating prematurely before a + // request was made to terminate it. + log.Infof(ctx, "tenant %q terminating", tenantName) + + case <-o.parentStopper.ShouldQuiesce(): + // Surrounding server is stopping; propagate the stop to the + // control goroutine below. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. + log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false + ctlStopper.Stop(tenantCtx) + + case <-stopRequestCh: + // Someone requested a graceful shutdown. + log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- true + ctlStopper.Stop(tenantCtx) + + case <-topCtx.Done(): + // Someone requested a shutdown - probably a test. + // Note: we can't do a graceful drain in that case because + // the RPC service in the surrounding server may already + // be unavailable. + log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) + useGracefulDrainDuringTenantShutdown <- false + ctlStopper.Stop(tenantCtx) + } + }); err != nil { + // The goroutine above is responsible for stopping the ctlStopper. + // If it fails to stop, we stop it here to avoid leaking a + // stopper. + ctlStopper.Stop(ctx) + return nil, err + } + + if err := o.parentStopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) { + startedOrStoppedChAlreadyClosed := false + defer func() { + // We may be returning early due to an error in the server initialization + // not otherwise caused by a server shutdown. In that case, we don't have + // a guarantee that the tenantStopper.Stop() call will ever be called + // and we could get a goroutine leak for the above task. + // To prevent this, we call requestStop() which tells the goroutine above + // to call tenantStopper.Stop() and terminate. + state.requestStop() + state.started.Set(false) + close(stoppedCh) + if !startedOrStoppedChAlreadyClosed { + state.startErr = errors.New("server stop before successful start") + close(startedOrStoppedCh) + } + + // Call the finalizer. + if finalizeFn != nil { + finalizeFn(ctx, tenantName) + } + }() + + // We use our detached tenantCtx, the incoming ctx given by + // RunAsyncTask, because this stopper will be assigned its own + // different tracer. + ctx := tenantCtx + // We want a context that gets cancelled when the server is + // shutting down, for the possible few cases in + // newServerInternal/preStart/acceptClients which are not looking at the + // tenantStopper.ShouldQuiesce() channel but are sensitive to context + // cancellation. + var cancel func() + ctx, cancel = ctlStopper.WithCancelOnQuiesce(ctx) + defer cancel() + + // Stop retrying startup/initialization if we are being shut + // down early. + retryOpts := retry.Options{ + Closer: ctlStopper.ShouldQuiesce(), + } + + // tenantStopper is the stopper specific to one tenant server + // instance. We define a new tenantStopper on every attempt to + // instantiate the tenant server below. It is then linked to + // ctlStopper below once the instantiation and start have + // succeeded. + var tenantStopper *stop.Stopper + + var tenantServer orchestratedServer + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + tenantStopper = stop.NewStopper() + + // Link the controller stopper to this tenant stopper. + if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-close-tenant task terminating") + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. + return + case <-ctlStopper.ShouldQuiesce(): + select { + case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + default: + } + tenantStopper.Stop(ctx) + case <-o.parentStopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. + tenantStopper.Stop(ctx) + } + }); err != nil { + tenantStopper.Stop(ctx) + return + } + + // Try to create the server. + s, err := func() (orchestratedServer, error) { + s, err := o.serverFactory.newServerForOrchestrator(ctx, state.nc, tenantStopper) + if err != nil { + return nil, errors.Wrap(err, "while creating server") + } + + // Note: we make preStart() below derive from ctx, which is + // cancelled on shutdown of the outer server. This is necessary + // to ensure preStart() properly stops prematurely in that case. + startCtx := s.annotateCtx(ctx) + startCtx = logtags.AddTag(startCtx, "start-server", nil) + log.Infof(startCtx, "starting tenant server") + if err := s.preStart(startCtx); err != nil { + return nil, errors.Wrap(err, "while starting server") + } + return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") + }() + if err != nil { + // Creation failed. We stop the tenant stopper here, which also + // takes care of terminating the async task we've just started above. + tenantStopper.Stop(ctx) + if startErrorFn != nil { + startErrorFn(ctx, tenantName, err) + } + log.Warningf(ctx, + "unable to start server for tenant %q (attempt %d, will retry): %v", + tenantName, retry.CurrentAttempt(), err) + state.startErr = err + continue + } + tenantServer = s + break + } + if tenantServer == nil { + // Retry loop exited before the server could start. This + // indicates that there was an async request to abandon the + // server startup. This is OK; just terminate early. The defer + // will take care of cleaning up. + return + } + + // Log the start event and ensure the stop event is logged eventually. + tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID() + if startCompleteFn != nil { + startCompleteFn(ctx, tenantName, tid, iid) + } + if serverStoppingFn != nil { + tenantStopper.AddCloser(stop.CloserFn(func() { + serverStoppingFn(ctx, tenantName, tid, iid) + })) + } + + // Indicate the server has started. + state.server = tenantServer + startedOrStoppedChAlreadyClosed = true + state.started.Set(true) + close(startedOrStoppedCh) + + // Wait for a request to shut down. + select { + case <-tenantStopper.ShouldQuiesce(): + log.Infof(ctx, "tenant %q finishing their own control loop", tenantName) + + case shutdownRequest := <-tenantServer.shutdownRequested(): + log.Infof(ctx, "tenant %q requesting their own shutdown: %v", + tenantName, shutdownRequest.ShutdownCause()) + // Make the async stop goroutine above pick up the task of shutting down. + state.requestStop() + } + }); err != nil { + // Clean up the task we just started before. + state.requestStop() + return nil, err + } + + return state, nil +} diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 241aa20a409c..41a8ba8af825 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -24,12 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" ) @@ -177,280 +174,6 @@ func (c *serverController) createServerEntryLocked( return entry, nil } -// startControlledServer is part of the serverOrchestrator interface. -func (o *channelOrchestrator) startControlledServer( - ctx context.Context, - // tenantName is the name of the tenant for which a server should - // be created. - tenantName roachpb.TenantName, - // finalizeFn is called when the server is fully stopped. - // This is always called, even if there is a server startup error. - finalizeFn func(ctx context.Context, tenantName roachpb.TenantName), - // startErrorFn is called every time there is an error starting - // the server. Suggested use is for logging. To synchronize on the - // server's state, use the resulting serverState instead. - startErrorFn func(ctx context.Context, tenantName roachpb.TenantName, err error), - // serverStartedFn is called when the server has started - // successfully and is accepting clients. Suggested use is for - // logging. To synchronize on the server's state, use the - // resulting serverState instead. - startCompleteFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), - // serverStoppingFn is called when the server is shutting down - // after a successful start. Suggested use is for logging. To - // synchronize on the server's state, use the resulting - // serverState instead. - serverStoppingFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), -) (serverState, error) { - var stoppedChClosed syncutil.AtomicBool - stopRequestCh := make(chan struct{}) - stoppedCh := make(chan struct{}) - startedOrStoppedCh := make(chan struct{}) - state := &serverStateUsingChannels{ - nc: roachpb.NewTenantNameContainer(tenantName), - startedOrStoppedCh: startedOrStoppedCh, - requestStop: func() { - if !stoppedChClosed.Swap(true) { - close(stopRequestCh) - } - }, - stoppedCh: stoppedCh, - } - - topCtx := ctx - - // Use a different context for the tasks below, because the tenant - // stopper will have its own tracer which is incompatible with the - // tracer attached to the incoming context. - tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) - tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil) - tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName) - - // ctlStopper is a stopper uniquely responsible for the control - // loop. It is separate from the tenantStopper defined below so - // that we can retry the server instantiation if it fails. - ctlStopper := stop.NewStopper() - - // useGracefulDrainDuringTenantShutdown defined whether a graceful - // drain is requested on the tenant server by orchestration. - useGracefulDrainDuringTenantShutdown := make(chan bool, 1) - - // Ensure that if the surrounding server requests shutdown, we - // propagate it to the new server. - if err := o.parentStopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { - defer log.Infof(ctx, "propagate-close task terminating") - select { - case <-stoppedCh: - // Server control loop is terminating prematurely before a - // request was made to terminate it. - log.Infof(ctx, "tenant %q terminating", tenantName) - - case <-o.parentStopper.ShouldQuiesce(): - // Surrounding server is stopping; propagate the stop to the - // control goroutine below. - // Note: we can't do a graceful drain in that case because - // the RPC service in the surrounding server may already - // be unavailable. - log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- false - ctlStopper.Stop(tenantCtx) - - case <-stopRequestCh: - // Someone requested a graceful shutdown. - log.Infof(ctx, "received request for tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- true - ctlStopper.Stop(tenantCtx) - - case <-topCtx.Done(): - // Someone requested a shutdown - probably a test. - // Note: we can't do a graceful drain in that case because - // the RPC service in the surrounding server may already - // be unavailable. - log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- false - ctlStopper.Stop(tenantCtx) - } - }); err != nil { - // The goroutine above is responsible for stopping the ctlStopper. - // If it fails to stop, we stop it here to avoid leaking a - // stopper. - ctlStopper.Stop(ctx) - return nil, err - } - - if err := o.parentStopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) { - startedOrStoppedChAlreadyClosed := false - defer func() { - // We may be returning early due to an error in the server initialization - // not otherwise caused by a server shutdown. In that case, we don't have - // a guarantee that the tenantStopper.Stop() call will ever be called - // and we could get a goroutine leak for the above task. - // To prevent this, we call requestStop() which tells the goroutine above - // to call tenantStopper.Stop() and terminate. - state.requestStop() - state.started.Set(false) - close(stoppedCh) - if !startedOrStoppedChAlreadyClosed { - state.startErr = errors.New("server stop before successful start") - close(startedOrStoppedCh) - } - - // Call the finalizer. - if finalizeFn != nil { - finalizeFn(ctx, tenantName) - } - }() - - // We use our detached tenantCtx, the incoming ctx given by - // RunAsyncTask, because this stopper will be assigned its own - // different tracer. - ctx := tenantCtx - // We want a context that gets cancelled when the server is - // shutting down, for the possible few cases in - // newServerInternal/preStart/acceptClients which are not looking at the - // tenantStopper.ShouldQuiesce() channel but are sensitive to context - // cancellation. - var cancel func() - ctx, cancel = ctlStopper.WithCancelOnQuiesce(ctx) - defer cancel() - - // Stop retrying startup/initialization if we are being shut - // down early. - retryOpts := retry.Options{ - Closer: ctlStopper.ShouldQuiesce(), - } - - // tenantStopper is the stopper specific to one tenant server - // instance. We define a new tenantStopper on every attempt to - // instantiate the tenant server below. It is then linked to - // ctlStopper below once the instantiation and start have - // succeeded. - var tenantStopper *stop.Stopper - - var tenantServer orchestratedServer - for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { - tenantStopper = stop.NewStopper() - - // Link the controller stopper to this tenant stopper. - if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { - defer log.Infof(ctx, "propagate-close-tenant task terminating") - select { - case <-tenantStopper.ShouldQuiesce(): - // Tenant server shutting down on its own. - return - case <-ctlStopper.ShouldQuiesce(): - select { - case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: - if gracefulDrainRequested { - // Ensure that the graceful drain for the tenant server aborts - // early if the Stopper for the surrounding server is - // prematurely shutting down. This is because once the surrounding node - // starts quiescing tasks, it won't be able to process KV requests - // by the tenant server any more. - // - // Beware: we use tenantCtx here, not ctx, because the - // latter has been linked to ctlStopper.Quiesce already - // -- and in this select branch that context has been - // canceled already. - drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) - defer cancel() - log.Infof(drainCtx, "starting graceful drain") - // Call the drain service on that tenant's server. This may take a - // while as it needs to wait for clients to disconnect and SQL - // activity to clear up, possibly waiting for various configurable - // timeouts. - CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) - } - default: - } - tenantStopper.Stop(ctx) - case <-o.parentStopper.ShouldQuiesce(): - // Expedited shutdown of the surrounding KV node. - tenantStopper.Stop(ctx) - } - }); err != nil { - tenantStopper.Stop(ctx) - return - } - - // Try to create the server. - s, err := func() (orchestratedServer, error) { - s, err := o.serverFactory.newServerForOrchestrator(ctx, state.nc, tenantStopper) - if err != nil { - return nil, errors.Wrap(err, "while creating server") - } - - // Note: we make preStart() below derive from ctx, which is - // cancelled on shutdown of the outer server. This is necessary - // to ensure preStart() properly stops prematurely in that case. - startCtx := s.annotateCtx(ctx) - startCtx = logtags.AddTag(startCtx, "start-server", nil) - log.Infof(startCtx, "starting tenant server") - if err := s.preStart(startCtx); err != nil { - return nil, errors.Wrap(err, "while starting server") - } - return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") - }() - if err != nil { - // Creation failed. We stop the tenant stopper here, which also - // takes care of terminating the async task we've just started above. - tenantStopper.Stop(ctx) - if startErrorFn != nil { - startErrorFn(ctx, tenantName, err) - } - log.Warningf(ctx, - "unable to start server for tenant %q (attempt %d, will retry): %v", - tenantName, retry.CurrentAttempt(), err) - state.startErr = err - continue - } - tenantServer = s - break - } - if tenantServer == nil { - // Retry loop exited before the server could start. This - // indicates that there was an async request to abandon the - // server startup. This is OK; just terminate early. The defer - // will take care of cleaning up. - return - } - - // Log the start event and ensure the stop event is logged eventually. - tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID() - if startCompleteFn != nil { - startCompleteFn(ctx, tenantName, tid, iid) - } - if serverStoppingFn != nil { - tenantStopper.AddCloser(stop.CloserFn(func() { - serverStoppingFn(ctx, tenantName, tid, iid) - })) - } - - // Indicate the server has started. - state.server = tenantServer - startedOrStoppedChAlreadyClosed = true - state.started.Set(true) - close(startedOrStoppedCh) - - // Wait for a request to shut down. - select { - case <-tenantStopper.ShouldQuiesce(): - log.Infof(ctx, "tenant %q finishing their own control loop", tenantName) - - case shutdownRequest := <-tenantServer.shutdownRequested(): - log.Infof(ctx, "tenant %q requesting their own shutdown: %v", - tenantName, shutdownRequest.ShutdownCause()) - // Make the async stop goroutine above pick up the task of shutting down. - state.requestStop() - } - }); err != nil { - // Clean up the task we just started before. - state.requestStop() - return nil, err - } - - return state, nil -} - // getExpectedRunningTenants retrieves the tenant IDs that should // be running right now. // TODO(knz): Use a watcher here. From 7e9181f957ac4263c987813a13441d9a3929ac2f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 12:21:44 +0200 Subject: [PATCH 15/22] server: avoid naked writes to a channel The sever controller was writing to `useGracefulDrainDuringTenantShutdown` unconditionally. As a result, if two or more expedited stops caught up with a graceful stop, some of the writes to the channel could be blocked. This would never happen on a running system, but could be exercised in tests. This patch fixes this by making the write unblocking. This generally makes the code more readable anyway, since naked channel writes are bound to raise eyebrows anyways. Release note: None --- .../server_controller_channel_orchestrator.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index 4d48dcd21783..4f3d2d3b2d8e 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -225,6 +225,13 @@ func (o *channelOrchestrator) startControlledServer( // useGracefulDrainDuringTenantShutdown defined whether a graceful // drain is requested on the tenant server by orchestration. useGracefulDrainDuringTenantShutdown := make(chan bool, 1) + markDrainMode := func(graceful bool) { + select { + case useGracefulDrainDuringTenantShutdown <- graceful: + default: + // Avoid blocking write. + } + } // Ensure that if the surrounding server requests shutdown, we // propagate it to the new server. @@ -243,13 +250,13 @@ func (o *channelOrchestrator) startControlledServer( // the RPC service in the surrounding server may already // be unavailable. log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- false + markDrainMode(false) ctlStopper.Stop(tenantCtx) case <-stopRequestCh: // Someone requested a graceful shutdown. - log.Infof(ctx, "received request for tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- true + log.Infof(ctx, "received request for tenant %q to terminate gracefully", tenantName) + markDrainMode(true) ctlStopper.Stop(tenantCtx) case <-topCtx.Done(): @@ -258,7 +265,7 @@ func (o *channelOrchestrator) startControlledServer( // the RPC service in the surrounding server may already // be unavailable. log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) - useGracefulDrainDuringTenantShutdown <- false + markDrainMode(false) ctlStopper.Stop(tenantCtx) } }); err != nil { From 38c4e87a718cee934fcbea31c1f14697c761e8c2 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 09:29:27 +0200 Subject: [PATCH 16/22] server: split graceful vs ungraceful stop for tenant servers Prior to this patch, immediate shutdown requests for secondary tenant servers were serviced using the graceful path. This was incorrect, and resulted in excessively long shutdown sequences in tests in some cases. This patch fixes it. Release note: None --- .../server_controller_channel_orchestrator.go | 91 +++++++++++-------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index 4f3d2d3b2d8e..9cebb1cbd8ab 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -12,6 +12,7 @@ package server import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -88,9 +89,14 @@ type serverStateUsingChannels struct { // be used to observe the start state without waiting. started syncutil.AtomicBool - // requestStop can be called to request a server to stop. + // requestImmediateStop can be called to request a server to stop + // ungracefully. // It can be called multiple times. - requestStop func() + requestImmediateStop func() + + // requestGracefulStop can be called to request a server to stop gracefully. + // It can be called multiple times. + requestGracefulStop func() // stopped is closed when the server has stopped. stoppedCh <-chan struct{} @@ -115,25 +121,14 @@ func (s *serverStateUsingChannels) getLastStartupError() error { // requestGracefulShutdown is part of the serverState interface. func (s *serverStateUsingChannels) requestGracefulShutdown(ctx context.Context) { - // TODO(knz): this is the code that was originally implemented, and - // it is incorrect because it does not obey the incoming context's - // cancellation. - s.requestStop() + // TODO(knz): this is incorrect because it does not obey the + // incoming context's cancellation. + s.requestGracefulStop() } // requestImmediateShutdown is part of the serverState interface. func (s *serverStateUsingChannels) requestImmediateShutdown(ctx context.Context) { - // TODO(knz): this is the code that was originally implemented, - // and it is incorrect; this should strigger a stop on - // the tenant stopper. - // - // Luckly, this implementation error happens to be innocuous because - // the only call to reqquestImmediateShutdown() happens after the - // parent stopper quiescence, and the control loop is already - // sensitive to that. - // - // We should refactor this to remove the potential for confusion. - s.requestStop() + s.requestImmediateStop() } // stopped is part of the serverState interface. @@ -158,11 +153,12 @@ func (o *channelOrchestrator) makeServerStateForSystemTenant( close(closedChan) closeCtx, cancelFn := context.WithCancel(context.Background()) st := &serverStateUsingChannels{ - nc: nc, - server: systemSrv, - startedOrStoppedCh: closedChan, - requestStop: cancelFn, - stoppedCh: closeCtx.Done(), + nc: nc, + server: systemSrv, + startedOrStoppedCh: closedChan, + requestImmediateStop: cancelFn, + requestGracefulStop: cancelFn, + stoppedCh: closeCtx.Done(), } st.started.Set(true) @@ -193,19 +189,30 @@ func (o *channelOrchestrator) startControlledServer( // serverState instead. serverStoppingFn func(ctx context.Context, tenantName roachpb.TenantName, tid roachpb.TenantID, sid base.SQLInstanceID), ) (serverState, error) { - var stoppedChClosed syncutil.AtomicBool - stopRequestCh := make(chan struct{}) + var immediateStopRequest sync.Once + immediateStopRequestCh := make(chan struct{}) + immediateStopFn := func() { + immediateStopRequest.Do(func() { + close(immediateStopRequestCh) + }) + } + var gracefulStopRequest sync.Once + gracefulStopRequestCh := make(chan struct{}) + gracefulStopFn := func() { + gracefulStopRequest.Do(func() { + close(gracefulStopRequestCh) + }) + } + stoppedCh := make(chan struct{}) startedOrStoppedCh := make(chan struct{}) + state := &serverStateUsingChannels{ - nc: roachpb.NewTenantNameContainer(tenantName), - startedOrStoppedCh: startedOrStoppedCh, - requestStop: func() { - if !stoppedChClosed.Swap(true) { - close(stopRequestCh) - } - }, - stoppedCh: stoppedCh, + nc: roachpb.NewTenantNameContainer(tenantName), + startedOrStoppedCh: startedOrStoppedCh, + requestImmediateStop: immediateStopFn, + requestGracefulStop: gracefulStopFn, + stoppedCh: stoppedCh, } topCtx := ctx @@ -253,12 +260,18 @@ func (o *channelOrchestrator) startControlledServer( markDrainMode(false) ctlStopper.Stop(tenantCtx) - case <-stopRequestCh: + case <-gracefulStopRequestCh: // Someone requested a graceful shutdown. log.Infof(ctx, "received request for tenant %q to terminate gracefully", tenantName) markDrainMode(true) ctlStopper.Stop(tenantCtx) + case <-immediateStopRequestCh: + // Someone requested a graceful shutdown. + log.Infof(ctx, "received request for tenant %q to terminate immediately", tenantName) + markDrainMode(false) + ctlStopper.Stop(tenantCtx) + case <-topCtx.Done(): // Someone requested a shutdown - probably a test. // Note: we can't do a graceful drain in that case because @@ -283,9 +296,11 @@ func (o *channelOrchestrator) startControlledServer( // not otherwise caused by a server shutdown. In that case, we don't have // a guarantee that the tenantStopper.Stop() call will ever be called // and we could get a goroutine leak for the above task. - // To prevent this, we call requestStop() which tells the goroutine above - // to call tenantStopper.Stop() and terminate. - state.requestStop() + // + // To prevent this, we call requestImmediateStop() which tells + // the goroutine above to call tenantStopper.Stop() and + // terminate. + state.requestImmediateStop() state.started.Set(false) close(stoppedCh) if !startedOrStoppedChAlreadyClosed { @@ -439,11 +454,11 @@ func (o *channelOrchestrator) startControlledServer( log.Infof(ctx, "tenant %q requesting their own shutdown: %v", tenantName, shutdownRequest.ShutdownCause()) // Make the async stop goroutine above pick up the task of shutting down. - state.requestStop() + state.requestImmediateStop() } }); err != nil { // Clean up the task we just started before. - state.requestStop() + state.requestImmediateStop() return nil, err } From 6e6ef5b77e5470e8f1b8b13a77f349205f80bb09 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 12:00:51 +0200 Subject: [PATCH 17/22] server: extra logging for server controller Release note: None --- pkg/server/server.go | 1 + pkg/server/server_controller.go | 5 +++++ pkg/server/server_controller_orchestration.go | 6 ++++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index f8a44844f4db..da6730e62dad 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1114,6 +1114,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // Create a server controller. sc := newServerController(ctx, + cfg.BaseConfig.AmbientCtx, node, cfg.BaseConfig.IDContainer, stopper, st, lateBoundServer, diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index 4d30ce47e01f..351cdd4030de 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/redact" @@ -53,6 +54,8 @@ type onDemandServer interface { // Instantiation can fail, e.g. if the target tenant doesn't exist or // is not active. type serverController struct { + log.AmbientContext + // nodeID is the node ID of the server where the controller // is running. This is used for logging only. nodeID *base.NodeIDContainer @@ -100,6 +103,7 @@ type serverController struct { func newServerController( ctx context.Context, + ambientCtx log.AmbientContext, logger nodeEventLogger, parentNodeID *base.NodeIDContainer, parentStopper *stop.Stopper, @@ -110,6 +114,7 @@ func newServerController( sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName), ) *serverController { c := &serverController{ + AmbientContext: ambientCtx, nodeID: parentNodeID, logger: logger, st: st, diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 41a8ba8af825..ea20490e41fc 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -259,6 +259,8 @@ func (c *serverController) newServerForOrchestrator( // Close implements the stop.Closer interface. func (c *serverController) Close() { + ctx := c.AnnotateCtx(context.Background()) + log.Infof(ctx, "server controller shutting down ungracefully") // Note Close() is only called in the case of expedited shutdown. // It should not invoke the graceful drain process. entries := c.getAllEntries() @@ -266,10 +268,10 @@ func (c *serverController) Close() { // server should already be sensitive to the parent stopper // quiescing. for _, e := range entries { - e.requestImmediateShutdown(context.Background()) + e.requestImmediateShutdown(ctx) } - // Wait for shutdown for all servers. + log.Infof(ctx, "waiting for tenant servers to report stopped") for _, e := range entries { <-e.stopped() } From ba6655544b82d415bff619b00795e9d07a9d6c8f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 12:25:41 +0200 Subject: [PATCH 18/22] server: fix a possible deadlock during tenant server shutdown Prior to this patch, the server orchestrator was unable to notice when the surrounding server was stopping after it had started a graceful drain already. This is because once the `propagate-close` task (and its `select`) monitoring various stop signals noticed a request for graceful drains, it would stop monitoring the other stop signals to focus exclusively on the graceful drain; thereby missing when the stopper for the surrounding server was quiescing. The fix is to run the monitor for graceful and ungraceful shutdowns in separate tasks, which this patch does. This goes 80% of the way towards de-flaking `TestServerStartStop` in `pkg/ccl/serverccl`, which was exhibiting the deadlock described above. With this fix in place, the deadlock disappears entirely. However, this reveals _another_ bug in the SQL layer which also needs to be fixed to consider `TestServerStartStop` stable. We will do this in a separate commit. Release note: None --- .../server_controller_channel_orchestrator.go | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index 9cebb1cbd8ab..fb8b8f13364d 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -344,7 +344,27 @@ func (o *channelOrchestrator) startControlledServer( for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { tenantStopper = stop.NewStopper() - // Link the controller stopper to this tenant stopper. + // Task that is solely responsible for propagating ungraceful exits. + if err := ctlStopper.RunAsyncTask(ctx, "propagate-ungraceful-stop", func(ctx context.Context) { + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. + return + case <-immediateStopRequestCh: + // An immediate stop request is catching up with the + // graceful drain above. + tenantStopper.Stop(ctx) + case <-o.parentStopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. + tenantStopper.Stop(ctx) + } + log.Infof(ctx, "propagate-ungraceful-stop task terminating") + }); err != nil { + tenantStopper.Stop(ctx) + return + } + + // Task that propagates graceful shutdowns. if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { defer log.Infof(ctx, "propagate-close-tenant task terminating") select { @@ -352,34 +372,34 @@ func (o *channelOrchestrator) startControlledServer( // Tenant server shutting down on its own. return case <-ctlStopper.ShouldQuiesce(): - select { - case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown: - if gracefulDrainRequested { - // Ensure that the graceful drain for the tenant server aborts - // early if the Stopper for the surrounding server is - // prematurely shutting down. This is because once the surrounding node - // starts quiescing tasks, it won't be able to process KV requests - // by the tenant server any more. - // - // Beware: we use tenantCtx here, not ctx, because the - // latter has been linked to ctlStopper.Quiesce already - // -- and in this select branch that context has been - // canceled already. - drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) - defer cancel() - log.Infof(drainCtx, "starting graceful drain") - // Call the drain service on that tenant's server. This may take a - // while as it needs to wait for clients to disconnect and SQL - // activity to clear up, possibly waiting for various configurable - // timeouts. - CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) - } - default: + gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + // If an immediate drain catches up with the graceful drain, make + // the former cancel the ctx too. + var cancel2 func() + drainCtx, cancel2 = tenantStopper.WithCancelOnQuiesce(drainCtx) + defer cancel2() + + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) } tenantStopper.Stop(ctx) - case <-o.parentStopper.ShouldQuiesce(): - // Expedited shutdown of the surrounding KV node. - tenantStopper.Stop(ctx) } }); err != nil { tenantStopper.Stop(ctx) From 55d7e92f00c79134e47d40b95f782b49266f9ec6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 13 Apr 2023 12:25:24 +0200 Subject: [PATCH 19/22] server: avoid a race condition in tenant server orchestration Prior to this patch, the task that was responsible for propagating a graceful drain captured a variable supposed to reference the tenant server, but before this variable was assigned. This created a possible race condition, in the unlikely case case where the server startup would fail _and_ a graceful drain would be requested, concurrently. This patch fixes it by only starting to propagate graceful drains after the server is fully initialized. (But before it starts accepting clients, so that we don't create a window of time where clients can connects but graceful drains don't propagate.) This is also achieved by extracting the two shutdown tasks into separate functions, to clarify the flow of parameters. Release note: None --- .../server_controller_channel_orchestrator.go | 138 ++++++++++-------- 1 file changed, 81 insertions(+), 57 deletions(-) diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index fb8b8f13364d..a3587b452369 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -345,63 +345,7 @@ func (o *channelOrchestrator) startControlledServer( tenantStopper = stop.NewStopper() // Task that is solely responsible for propagating ungraceful exits. - if err := ctlStopper.RunAsyncTask(ctx, "propagate-ungraceful-stop", func(ctx context.Context) { - select { - case <-tenantStopper.ShouldQuiesce(): - // Tenant server shutting down on its own. - return - case <-immediateStopRequestCh: - // An immediate stop request is catching up with the - // graceful drain above. - tenantStopper.Stop(ctx) - case <-o.parentStopper.ShouldQuiesce(): - // Expedited shutdown of the surrounding KV node. - tenantStopper.Stop(ctx) - } - log.Infof(ctx, "propagate-ungraceful-stop task terminating") - }); err != nil { - tenantStopper.Stop(ctx) - return - } - - // Task that propagates graceful shutdowns. - if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) { - defer log.Infof(ctx, "propagate-close-tenant task terminating") - select { - case <-tenantStopper.ShouldQuiesce(): - // Tenant server shutting down on its own. - return - case <-ctlStopper.ShouldQuiesce(): - gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown - if gracefulDrainRequested { - // Ensure that the graceful drain for the tenant server aborts - // early if the Stopper for the surrounding server is - // prematurely shutting down. This is because once the surrounding node - // starts quiescing tasks, it won't be able to process KV requests - // by the tenant server any more. - // - // Beware: we use tenantCtx here, not ctx, because the - // latter has been linked to ctlStopper.Quiesce already - // -- and in this select branch that context has been - // canceled already. - drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) - defer cancel() - // If an immediate drain catches up with the graceful drain, make - // the former cancel the ctx too. - var cancel2 func() - drainCtx, cancel2 = tenantStopper.WithCancelOnQuiesce(drainCtx) - defer cancel2() - - log.Infof(drainCtx, "starting graceful drain") - // Call the drain service on that tenant's server. This may take a - // while as it needs to wait for clients to disconnect and SQL - // activity to clear up, possibly waiting for various configurable - // timeouts. - CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) - } - tenantStopper.Stop(ctx) - } - }); err != nil { + if err := o.propagateUngracefulStopAsync(ctx, ctlStopper, tenantStopper, immediateStopRequestCh); err != nil { tenantStopper.Stop(ctx) return } @@ -422,6 +366,15 @@ func (o *channelOrchestrator) startControlledServer( if err := s.preStart(startCtx); err != nil { return nil, errors.Wrap(err, "while starting server") } + + // Task that propagates graceful shutdowns. + // This can only start after the server has started successfully. + if err := o.propagateGracefulDrainAsync(ctx, + tenantCtx, ctlStopper, tenantStopper, + useGracefulDrainDuringTenantShutdown, s); err != nil { + return nil, errors.Wrap(err, "while starting graceful drain propagation task") + } + return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") }() if err != nil { @@ -484,3 +437,74 @@ func (o *channelOrchestrator) startControlledServer( return state, nil } + +// propagateUngracefulStopAsync propagates ungraceful stop requests +// from the surrounding KV node into the tenant server. +func (o *channelOrchestrator) propagateUngracefulStopAsync( + ctx context.Context, + ctlStopper, tenantStopper *stop.Stopper, + immediateStopRequestCh <-chan struct{}, +) error { + return ctlStopper.RunAsyncTask(ctx, "propagate-ungraceful-stop", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-ungraceful-stop task terminating") + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. + return + case <-immediateStopRequestCh: + // An immediate stop request is catching up with the + // graceful drain above. + tenantStopper.Stop(ctx) + case <-o.parentStopper.ShouldQuiesce(): + // Expedited shutdown of the surrounding KV node. + tenantStopper.Stop(ctx) + } + }) +} + +// propagateGracefulDrainAsync propagates graceful drain requests +// from the surrounding KV node into the tenant server. +func (o *channelOrchestrator) propagateGracefulDrainAsync( + ctx, tenantCtx context.Context, + ctlStopper, tenantStopper *stop.Stopper, + gracefulDrainRequestCh <-chan bool, + tenantServer orchestratedServer, +) error { + return ctlStopper.RunAsyncTask(ctx, "propagate-graceful-drain", func(ctx context.Context) { + defer log.Infof(ctx, "propagate-graceful-drain task terminating") + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant server shutting down on its own. + return + case <-ctlStopper.ShouldQuiesce(): + gracefulDrainRequested := <-gracefulDrainRequestCh + if gracefulDrainRequested { + // Ensure that the graceful drain for the tenant server aborts + // early if the Stopper for the surrounding server is + // prematurely shutting down. This is because once the surrounding node + // starts quiescing tasks, it won't be able to process KV requests + // by the tenant server any more. + // + // Beware: we use tenantCtx here, not ctx, because the + // latter has been linked to ctlStopper.Quiesce already + // -- and in this select branch that context has been + // canceled already. + drainCtx, cancel := o.parentStopper.WithCancelOnQuiesce(tenantCtx) + defer cancel() + // If an immediate drain catches up with the graceful drain, make + // the former cancel the ctx too. + var cancel2 func() + drainCtx, cancel2 = tenantStopper.WithCancelOnQuiesce(drainCtx) + defer cancel2() + + log.Infof(drainCtx, "starting graceful drain") + // Call the drain service on that tenant's server. This may take a + // while as it needs to wait for clients to disconnect and SQL + // activity to clear up, possibly waiting for various configurable + // timeouts. + CallDrainServerSide(drainCtx, tenantServer.gracefulDrain) + } + tenantStopper.Stop(ctx) + } + }) +} From a416d7c95a8cb66e1127e2a8d939348e6c470987 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 13:02:24 +0200 Subject: [PATCH 20/22] util/mon: better logging Release note: None --- pkg/util/mon/bytes_usage.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/util/mon/bytes_usage.go b/pkg/util/mon/bytes_usage.go index 914a22059934..c44bd92b7d72 100644 --- a/pkg/util/mon/bytes_usage.go +++ b/pkg/util/mon/bytes_usage.go @@ -12,7 +12,6 @@ package mon import ( "context" - "fmt" "io" "math" "math/bits" @@ -473,10 +472,10 @@ func (mm *BytesMonitor) StartNoReserved(ctx context.Context, pool *BytesMonitor) // - reserved is the pre-reserved budget (see above). func (mm *BytesMonitor) Start(ctx context.Context, pool *BytesMonitor, reserved *BoundAccount) { if mm.mu.curAllocated != 0 { - panic(fmt.Sprintf("%s: started with %d bytes left over", mm.name, mm.mu.curAllocated)) + panic(errors.AssertionFailedf("%s: started with %d bytes left over", mm.name, mm.mu.curAllocated)) } if mm.mu.curBudget.mon != nil { - panic(fmt.Sprintf("%s: already started with pool %s", mm.name, mm.mu.curBudget.mon.name)) + panic(errors.AssertionFailedf("%s: already started with pool %s", mm.name, mm.mu.curBudget.mon.name)) } mm.mu.curAllocated = 0 mm.mu.maxAllocated = 0 @@ -539,7 +538,6 @@ func NewUnlimitedMonitor( ) *BytesMonitor { if log.V(2) { log.InfofDepth(ctx, 1, "%s: starting unlimited monitor", name) - } m := &BytesMonitor{ name: name, From 1095b528eeba8555d91794c165192597bbc7b5d3 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 12 Apr 2023 13:23:33 +0200 Subject: [PATCH 21/22] sqlstats: paper over a memory monitor bug When a hard stop (e.g. test exists) catches up with an ongoing graceful drain, some bytes do not get released properly in a memory monitor. This triggers a panic during shutdown, since the byte monitor code verifies that all allocated bytes have been released. This bug is relatively hard to trigger because in most cases, a server is shut down either only using a graceful drain, or only using a hard stop, but not both. `TestServerStartStop` happens to do this and this is where this problem was caught. We are now tracking it as issue #101297. Until that issue is fixed, this commit papers over the problem by removing the assertion in the byte monitor. Release note: None --- pkg/sql/sqlstats/persistedsqlstats/provider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 988a6f7ded5f..774f7e707106 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -109,7 +109,11 @@ func (s *PersistedSQLStats) Start(ctx context.Context, stopper *stop.Stopper) { s.startSQLStatsFlushLoop(ctx, stopper) s.jobMonitor.start(ctx, stopper, s.drain, &s.tasksDoneWG) stopper.AddCloser(stop.CloserFn(func() { - s.cfg.InternalExecutorMonitor.Stop(ctx) + // TODO(knz,yahor): This really should be just Stop(), but there + // is a leak somewhere and would cause a panic when a hard stop + // catches up with a graceful drain. + // See: https://github.com/cockroachdb/cockroach/issues/101297 + s.cfg.InternalExecutorMonitor.EmergencyStop(ctx) })) } From 6fc92d110d7fd49e6a0e70c9d8a834c8e1950e5b Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 13 Apr 2023 17:49:15 +0200 Subject: [PATCH 22/22] server: deflake TestClusterVersionUpgrade under race Release note: None --- pkg/server/version_cluster_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/server/version_cluster_test.go b/pkg/server/version_cluster_test.go index 5430acf0bac1..6d2b13cefb8a 100644 --- a/pkg/server/version_cluster_test.go +++ b/pkg/server/version_cluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/upgrade" @@ -205,6 +206,9 @@ func TestClusterVersionUpgrade(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderShort(t, "test takes minutes") + skip.UnderRace(t, "takes >5mn under race") + ctx := context.Background() var newVersion = clusterversion.TestingBinaryVersion