From 85d0183424e85749570f03c8ff9ec1c7621a606f Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 31 Jan 2023 12:03:57 +0000 Subject: [PATCH 1/3] server: quiesce tenant stopper when server stops Previously, the tenant stopper would only be stopped by a call to serverController.Stop(). However, that Stop() call would only happen if the top level stopper had quiesced, which it couldn't do because it was waiting on the tenant server to start up. The problem was caused by a mistaken assumption that the closers on the top level stopper are run before Quiesce(), whereas it's the other way around: Quiesce() waits for all tasks to terminate before calling the closers. This patch fixes this situation by hoisting the instantiation of the tenant stopper to the control function, and ensuring that the top-level stopper quiescence propagates to a Stop() call on the child stopper. Additionally, we discovered that the KV tenant connector was not properly aborting its own startup upon server shutdown. This was an oversight from the original implementation. This patch fixes that by adding the proper check on the stopper ShouldQuiesce() channel. Release note: None --- pkg/ccl/kvccl/kvtenantccl/connector.go | 3 + pkg/server/server_controller_new_server.go | 28 +++--- pkg/server/server_controller_orchestration.go | 98 ++++++++++++++----- 3 files changed, 88 insertions(+), 41 deletions(-) diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index bda6e0c72ce7..773926dcc4f7 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -213,6 +213,9 @@ func (c *Connector) Start(ctx context.Context) error { settingsStartupCh = nil case <-ctx.Done(): return ctx.Err() + case <-c.rpcContext.Stopper.ShouldQuiesce(): + log.Infof(ctx, "kv connector asked to shut down before full start") + return errors.New("request to shut down early") } } return nil diff --git a/pkg/server/server_controller_new_server.go b/pkg/server/server_controller_new_server.go index caf9cd49816d..92e74f48d805 100644 --- a/pkg/server/server_controller_new_server.go +++ b/pkg/server/server_controller_new_server.go @@ -53,7 +53,10 @@ type tenantServerCreator interface { // can be checked with errors.Is. // // testArgs is used by tests to tweak the tenant server. - newTenantServer(ctx context.Context, tenantNameContainer *roachpb.TenantNameContainer, index int, + newTenantServer(ctx context.Context, + tenantNameContainer *roachpb.TenantNameContainer, + tenantStopper *stop.Stopper, + index int, testArgs base.TestSharedProcessTenantArgs, ) (onDemandServer, error) } @@ -64,6 +67,7 @@ var _ tenantServerCreator = &Server{} func (s *Server) newTenantServer( ctx context.Context, tenantNameContainer *roachpb.TenantNameContainer, + tenantStopper *stop.Stopper, index int, testArgs base.TestSharedProcessTenantArgs, ) (onDemandServer, error) { @@ -71,7 +75,7 @@ func (s *Server) newTenantServer( if err != nil { return nil, err } - tenantStopper, baseCfg, sqlCfg, err := s.makeSharedProcessTenantConfig(ctx, tenantID, index) + baseCfg, sqlCfg, err := s.makeSharedProcessTenantConfig(ctx, tenantID, index, tenantStopper) if err != nil { return nil, err } @@ -81,8 +85,6 @@ func (s *Server) newTenantServer( tenantServer, err := s.startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) if err != nil { - // Abandon any work done so far. - tenantStopper.Stop(ctx) return nil, err } @@ -164,15 +166,8 @@ func (s *Server) startTenantServerInternal( } func (s *Server) makeSharedProcessTenantConfig( - ctx context.Context, tenantID roachpb.TenantID, index int, -) (*stop.Stopper, BaseConfig, SQLConfig, error) { - stopper := stop.NewStopper() - defer func() { - if stopper != nil { - stopper.Stop(ctx) - } - }() - + ctx context.Context, tenantID roachpb.TenantID, index int, stopper *stop.Stopper, +) (BaseConfig, SQLConfig, error) { // Create a configuration for the new tenant. // TODO(knz): Maybe enforce the SQL Instance ID to be equal to the KV node ID? // See: https://github.com/cockroachdb/cockroach/issues/84602 @@ -183,11 +178,10 @@ func (s *Server) makeSharedProcessTenantConfig( } baseCfg, sqlCfg, err := makeSharedProcessTenantServerConfig(ctx, tenantID, index, parentCfg, localServerInfo, stopper) if err != nil { - return nil, BaseConfig{}, SQLConfig{}, err + return BaseConfig{}, SQLConfig{}, err } - st := stopper - stopper = nil // inhibit the deferred Stop() - return st, baseCfg, sqlCfg, nil + + return baseCfg, sqlCfg, nil } func makeSharedProcessTenantServerConfig( diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 970a06827b2d..b07f8d6f15a4 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -25,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" ) @@ -200,9 +202,50 @@ func (c *serverController) startControlledServer( stopped: stoppedCh, }, } - if err := c.stopper.RunAsyncTask(ctx, "managed-tenant-server", func(ctx context.Context) { + + topCtx := ctx + // Use a different context for the tasks below, because the tenant + // stopper will have its own tracer which is incompatible with the + // tracer attached to the incoming context. + + tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil) + + tenantStopper := stop.NewStopper() + // Ensure that if the surrounding server requests shutdown, we + // propagate it to the new server. + if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) { + select { + case <-tenantStopper.ShouldQuiesce(): + // Tenant is terminating on their own; nothing else to do here. + log.Infof(ctx, "tenant %q terminating", tenantName) + case <-c.stopper.ShouldQuiesce(): + // Surrounding server is stopping; propagate the stop to the + // control goroutine below. + log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName) + tenantStopper.Stop(tenantCtx) + case <-stopRequestCh: + // Someone requested a shutdown. + log.Infof(ctx, "received request for tenant %q to terminate", tenantName) + tenantStopper.Stop(tenantCtx) + case <-topCtx.Done(): + // Someone requested a shutdown. + log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName) + tenantStopper.Stop(tenantCtx) + } + }); err != nil { + return nil, err + } + + if err := c.stopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) { startedOrStoppedChAlreadyClosed := false defer func() { + // We may be returning early due to an error in the server initialization + // not otherwise caused by a server shutdown. In that case, we don't have + // a guarantee that the tenantStopper.Stop() call will ever be called + // and we could get a goroutine leak for the above task. + // To prevent this, we call requestStop() which tells the goroutine above + // to call tenantStopper.Stop() and terminate. + entry.state.requestStop() entry.state.started.Set(false) close(stoppedCh) if !startedOrStoppedChAlreadyClosed { @@ -216,21 +259,29 @@ func (c *serverController) startControlledServer( delete(c.mu.servers, tenantName) }() + // We use our detached tenantCtx, the incoming ctx given by + // RunAsyncTask, because this stopper will be assigned its own + // different tracer. + ctx := tenantCtx + // We want a context that gets cancelled when the tenant is + // shutting down, for the possible few cases in + // startServerInternal which are not looking at the + // tenant.ShouldQuiesce() channel but are sensitive to context + // cancellation. + var cancel func() + ctx, cancel = tenantStopper.WithCancelOnQuiesce(ctx) + defer cancel() + + // Stop retrying startup/initialization if we are being shut + // down early. retryOpts := retry.Options{ - Closer: c.stopper.ShouldQuiesce(), + Closer: tenantStopper.ShouldQuiesce(), } var s onDemandServer for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { - select { - case <-stopRequestCh: - entry.state.startErr = errors.Newf("tenant %q: stop requested before start succeeded", tenantName) - log.Infof(ctx, "%v", entry.state.startErr) - return - default: - } var err error - s, err = c.startServerInternal(ctx, entry.nameContainer) + s, err = c.startServerInternal(ctx, entry.nameContainer, tenantStopper) if err != nil { c.logStartEvent(ctx, roachpb.TenantID{}, 0, entry.nameContainer.Get(), false /* success */, err) @@ -250,8 +301,12 @@ func (c *serverController) startControlledServer( return } + // Log the start event and ensure the stop event is logged eventually. tid, iid := s.getTenantID(), s.getInstanceID() c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil) + tenantStopper.AddCloser(stop.CloserFn(func() { + c.logStopEvent(ctx, tid, iid, tenantName) + })) // Indicate the server has started. entry.server = s @@ -261,23 +316,18 @@ func (c *serverController) startControlledServer( // Wait for a request to shut down. select { - case <-c.stopper.ShouldQuiesce(): - case <-stopRequestCh: + case <-tenantStopper.ShouldQuiesce(): + log.Infof(ctx, "tenant %q finishing their own control loop", tenantName) + case shutdownRequest := <-s.shutdownRequested(): log.Infof(ctx, "tenant %q requesting their own shutdown: %v", tenantName, shutdownRequest.ShutdownCause()) + // Make the async stop goroutine above pick up the task of shutting down. + entry.state.requestStop() } - log.Infof(ctx, "stop requested for tenant %q", tenantName) - - // Stop the server. - // We use context.Background() here so that the process of - // stopping the tenant does not get cancelled when shutting - // down the outer server. - s.stop(context.Background()) - c.logStopEvent(ctx, tid, iid, tenantName) - - // The defer on the return path will take care of the rest. }); err != nil { + // Clean up the task we just started before. + entry.state.requestStop() return nil, err } @@ -346,7 +396,7 @@ func (c *serverController) startAndWaitForRunningServer( } func (c *serverController) startServerInternal( - ctx context.Context, nameContainer *roachpb.TenantNameContainer, + ctx context.Context, nameContainer *roachpb.TenantNameContainer, tenantStopper *stop.Stopper, ) (onDemandServer, error) { tenantName := nameContainer.Get() testArgs := c.testArgs[tenantName] @@ -358,7 +408,7 @@ func (c *serverController) startServerInternal( c.mu.nextServerIdx++ return c.mu.nextServerIdx }() - return c.tenantServerCreator.newTenantServer(ctx, nameContainer, idx, testArgs) + return c.tenantServerCreator.newTenantServer(ctx, nameContainer, tenantStopper, idx, testArgs) } // Close implements the stop.Closer interface. From 1ee34f875d8a95796bb83b27014b694fc488f3aa Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 31 Jan 2023 12:05:41 +0000 Subject: [PATCH 2/3] pkg/server: close cmux listener and don't retry accept errors This patch fixes an ancient bug, which is somewhat hidden (we don't see it triggered in production easily): the cmux listener was not being closed properly upon server shutdown. This tends to be invisible in practice because in a real server the main goroutine (in cli/start) terminates and brings the process away with it without waiting for this. However, for the sake of generality we want to tie up the loose ends. It also reduces uncertainty about possible cmux-related deadlocks when shared-process tenant servers are shut down early. Release note: None --- pkg/server/start_listen.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/server/start_listen.go b/pkg/server/start_listen.go index c18d0eec6c7c..cee988664764 100644 --- a/pkg/server/start_listen.go +++ b/pkg/server/start_listen.go @@ -124,7 +124,20 @@ func startListenRPCAndSQL( // TODO(bdarnell): Do we need to also close the other listeners? netutil.FatalIfUnexpected(anyL.Close()) netutil.FatalIfUnexpected(loopbackL.Close()) + netutil.FatalIfUnexpected(ln.Close()) } + + // cmux auto-retries Accept() by default. Tell it + // to stop doing work if we see a request to shut down. + m.HandleError(func(err error) bool { + select { + case <-stopper.ShouldQuiesce(): + log.Infof(ctx, "server shutting down: instructing cmux to stop accepting") + return false + default: + return true + } + }) stopper.AddCloser(stop.CloserFn(func() { grpc.Stop() serveOnMux.Do(func() { From 3ef7687f039cd9bd1906f7b4e2ff8db00d82fbb2 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 31 Jan 2023 15:12:42 +0000 Subject: [PATCH 3/3] server: avoid deadlock during fast shutdown When dealing with on-demand started tenants in tests, we may see a server shutdown during a tenant start up. During server start up, this code adds a closer which calls grpc.Stop() and also, as a work-around to another bug, calls (*cmux).Serve(). It assumed that by the time Serve() was called, the listeners would all be closed and thus Serve would fail with an error. This assumption was wrong. The listeners were only closed in an async task that was started _after_ the closer was installed. Typically this is fine, but in the case of a stopper that is already stopping, the AddCloser call runs the given closer immediately. As a result, the closer was waiting forever in cmux.Serve. Now, we only install the closer after the asyncronous task is started. Further, if the async task fails to start, we close the listeners directly. Release note: None Epic: none --- pkg/server/start_listen.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/server/start_listen.go b/pkg/server/start_listen.go index cee988664764..42c133a52a38 100644 --- a/pkg/server/start_listen.go +++ b/pkg/server/start_listen.go @@ -92,6 +92,17 @@ func startListenRPCAndSQL( var serveOnMux sync.Once m := cmux.New(ln) + // cmux auto-retries Accept() by default. Tell it + // to stop doing work if we see a request to shut down. + m.HandleError(func(err error) bool { + select { + case <-stopper.ShouldQuiesce(): + log.Infof(ctx, "server shutting down: instructing cmux to stop accepting") + return false + default: + return true + } + }) if !cfg.SplitListenSQL && enableSQLListener { // If the pg port is split, it will be opened above. Otherwise, @@ -127,18 +138,7 @@ func startListenRPCAndSQL( netutil.FatalIfUnexpected(ln.Close()) } - // cmux auto-retries Accept() by default. Tell it - // to stop doing work if we see a request to shut down. - m.HandleError(func(err error) bool { - select { - case <-stopper.ShouldQuiesce(): - log.Infof(ctx, "server shutting down: instructing cmux to stop accepting") - return false - default: - return true - } - }) - stopper.AddCloser(stop.CloserFn(func() { + stopGRPC := func() { grpc.Stop() serveOnMux.Do(func() { // The cmux matches don't shut down properly unless serve is called on the @@ -146,12 +146,16 @@ func startListenRPCAndSQL( // if we wouldn't otherwise reach the point where we start serving on it. netutil.FatalIfUnexpected(m.Serve()) }) - })) + } + if err := stopper.RunAsyncTask( workersCtx, "grpc-quiesce", waitForQuiesce, ); err != nil { + waitForQuiesce(ctx) + stopGRPC() return nil, nil, nil, err } + stopper.AddCloser(stop.CloserFn(stopGRPC)) // startRPCServer starts the RPC server. We do not do this // immediately because we want the cluster to be ready (or ready to