Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96268: server: remove possible causes of deadlocks r=knz a=stevendanna

See the individual commits for details.

Epic: CRDB-14537
Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Jan 31, 2023
2 parents 2fd74fc + 3ef7687 commit 91bdcdd
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 43 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func (c *Connector) Start(ctx context.Context) error {
settingsStartupCh = nil
case <-ctx.Done():
return ctx.Err()
case <-c.rpcContext.Stopper.ShouldQuiesce():
log.Infof(ctx, "kv connector asked to shut down before full start")
return errors.New("request to shut down early")
}
}
return nil
Expand Down
28 changes: 11 additions & 17 deletions pkg/server/server_controller_new_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ type tenantServerCreator interface {
// can be checked with errors.Is.
//
// testArgs is used by tests to tweak the tenant server.
newTenantServer(ctx context.Context, tenantNameContainer *roachpb.TenantNameContainer, index int,
newTenantServer(ctx context.Context,
tenantNameContainer *roachpb.TenantNameContainer,
tenantStopper *stop.Stopper,
index int,
testArgs base.TestSharedProcessTenantArgs,
) (onDemandServer, error)
}
Expand All @@ -64,14 +67,15 @@ var _ tenantServerCreator = &Server{}
func (s *Server) newTenantServer(
ctx context.Context,
tenantNameContainer *roachpb.TenantNameContainer,
tenantStopper *stop.Stopper,
index int,
testArgs base.TestSharedProcessTenantArgs,
) (onDemandServer, error) {
tenantID, err := s.getTenantID(ctx, tenantNameContainer.Get())
if err != nil {
return nil, err
}
tenantStopper, baseCfg, sqlCfg, err := s.makeSharedProcessTenantConfig(ctx, tenantID, index)
baseCfg, sqlCfg, err := s.makeSharedProcessTenantConfig(ctx, tenantID, index, tenantStopper)
if err != nil {
return nil, err
}
Expand All @@ -81,8 +85,6 @@ func (s *Server) newTenantServer(

tenantServer, err := s.startTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer)
if err != nil {
// Abandon any work done so far.
tenantStopper.Stop(ctx)
return nil, err
}

Expand Down Expand Up @@ -164,15 +166,8 @@ func (s *Server) startTenantServerInternal(
}

func (s *Server) makeSharedProcessTenantConfig(
ctx context.Context, tenantID roachpb.TenantID, index int,
) (*stop.Stopper, BaseConfig, SQLConfig, error) {
stopper := stop.NewStopper()
defer func() {
if stopper != nil {
stopper.Stop(ctx)
}
}()

ctx context.Context, tenantID roachpb.TenantID, index int, stopper *stop.Stopper,
) (BaseConfig, SQLConfig, error) {
// Create a configuration for the new tenant.
// TODO(knz): Maybe enforce the SQL Instance ID to be equal to the KV node ID?
// See: https://github.com/cockroachdb/cockroach/issues/84602
Expand All @@ -183,11 +178,10 @@ func (s *Server) makeSharedProcessTenantConfig(
}
baseCfg, sqlCfg, err := makeSharedProcessTenantServerConfig(ctx, tenantID, index, parentCfg, localServerInfo, stopper)
if err != nil {
return nil, BaseConfig{}, SQLConfig{}, err
return BaseConfig{}, SQLConfig{}, err
}
st := stopper
stopper = nil // inhibit the deferred Stop()
return st, baseCfg, sqlCfg, nil

return baseCfg, sqlCfg, nil
}

func makeSharedProcessTenantServerConfig(
Expand Down
98 changes: 74 additions & 24 deletions pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -200,9 +202,50 @@ func (c *serverController) startControlledServer(
stopped: stoppedCh,
},
}
if err := c.stopper.RunAsyncTask(ctx, "managed-tenant-server", func(ctx context.Context) {

topCtx := ctx
// Use a different context for the tasks below, because the tenant
// stopper will have its own tracer which is incompatible with the
// tracer attached to the incoming context.

tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil)

tenantStopper := stop.NewStopper()
// Ensure that if the surrounding server requests shutdown, we
// propagate it to the new server.
if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) {
select {
case <-tenantStopper.ShouldQuiesce():
// Tenant is terminating on their own; nothing else to do here.
log.Infof(ctx, "tenant %q terminating", tenantName)
case <-c.stopper.ShouldQuiesce():
// Surrounding server is stopping; propagate the stop to the
// control goroutine below.
log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
case <-stopRequestCh:
// Someone requested a shutdown.
log.Infof(ctx, "received request for tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
case <-topCtx.Done():
// Someone requested a shutdown.
log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
}
}); err != nil {
return nil, err
}

if err := c.stopper.RunAsyncTask(ctx, "managed-tenant-server", func(_ context.Context) {
startedOrStoppedChAlreadyClosed := false
defer func() {
// We may be returning early due to an error in the server initialization
// not otherwise caused by a server shutdown. In that case, we don't have
// a guarantee that the tenantStopper.Stop() call will ever be called
// and we could get a goroutine leak for the above task.
// To prevent this, we call requestStop() which tells the goroutine above
// to call tenantStopper.Stop() and terminate.
entry.state.requestStop()
entry.state.started.Set(false)
close(stoppedCh)
if !startedOrStoppedChAlreadyClosed {
Expand All @@ -216,21 +259,29 @@ func (c *serverController) startControlledServer(
delete(c.mu.servers, tenantName)
}()

// We use our detached tenantCtx, the incoming ctx given by
// RunAsyncTask, because this stopper will be assigned its own
// different tracer.
ctx := tenantCtx
// We want a context that gets cancelled when the tenant is
// shutting down, for the possible few cases in
// startServerInternal which are not looking at the
// tenant.ShouldQuiesce() channel but are sensitive to context
// cancellation.
var cancel func()
ctx, cancel = tenantStopper.WithCancelOnQuiesce(ctx)
defer cancel()

// Stop retrying startup/initialization if we are being shut
// down early.
retryOpts := retry.Options{
Closer: c.stopper.ShouldQuiesce(),
Closer: tenantStopper.ShouldQuiesce(),
}

var s onDemandServer
for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); {
select {
case <-stopRequestCh:
entry.state.startErr = errors.Newf("tenant %q: stop requested before start succeeded", tenantName)
log.Infof(ctx, "%v", entry.state.startErr)
return
default:
}
var err error
s, err = c.startServerInternal(ctx, entry.nameContainer)
s, err = c.startServerInternal(ctx, entry.nameContainer, tenantStopper)
if err != nil {
c.logStartEvent(ctx, roachpb.TenantID{}, 0,
entry.nameContainer.Get(), false /* success */, err)
Expand All @@ -250,8 +301,12 @@ func (c *serverController) startControlledServer(
return
}

// Log the start event and ensure the stop event is logged eventually.
tid, iid := s.getTenantID(), s.getInstanceID()
c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil)
tenantStopper.AddCloser(stop.CloserFn(func() {
c.logStopEvent(ctx, tid, iid, tenantName)
}))

// Indicate the server has started.
entry.server = s
Expand All @@ -261,23 +316,18 @@ func (c *serverController) startControlledServer(

// Wait for a request to shut down.
select {
case <-c.stopper.ShouldQuiesce():
case <-stopRequestCh:
case <-tenantStopper.ShouldQuiesce():
log.Infof(ctx, "tenant %q finishing their own control loop", tenantName)

case shutdownRequest := <-s.shutdownRequested():
log.Infof(ctx, "tenant %q requesting their own shutdown: %v",
tenantName, shutdownRequest.ShutdownCause())
// Make the async stop goroutine above pick up the task of shutting down.
entry.state.requestStop()
}
log.Infof(ctx, "stop requested for tenant %q", tenantName)

// Stop the server.
// We use context.Background() here so that the process of
// stopping the tenant does not get cancelled when shutting
// down the outer server.
s.stop(context.Background())
c.logStopEvent(ctx, tid, iid, tenantName)

// The defer on the return path will take care of the rest.
}); err != nil {
// Clean up the task we just started before.
entry.state.requestStop()
return nil, err
}

Expand Down Expand Up @@ -346,7 +396,7 @@ func (c *serverController) startAndWaitForRunningServer(
}

func (c *serverController) startServerInternal(
ctx context.Context, nameContainer *roachpb.TenantNameContainer,
ctx context.Context, nameContainer *roachpb.TenantNameContainer, tenantStopper *stop.Stopper,
) (onDemandServer, error) {
tenantName := nameContainer.Get()
testArgs := c.testArgs[tenantName]
Expand All @@ -358,7 +408,7 @@ func (c *serverController) startServerInternal(
c.mu.nextServerIdx++
return c.mu.nextServerIdx
}()
return c.tenantServerCreator.newTenantServer(ctx, nameContainer, idx, testArgs)
return c.tenantServerCreator.newTenantServer(ctx, nameContainer, tenantStopper, idx, testArgs)
}

// Close implements the stop.Closer interface.
Expand Down
21 changes: 19 additions & 2 deletions pkg/server/start_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ func startListenRPCAndSQL(
var serveOnMux sync.Once

m := cmux.New(ln)
// cmux auto-retries Accept() by default. Tell it
// to stop doing work if we see a request to shut down.
m.HandleError(func(err error) bool {
select {
case <-stopper.ShouldQuiesce():
log.Infof(ctx, "server shutting down: instructing cmux to stop accepting")
return false
default:
return true
}
})

if !cfg.SplitListenSQL && enableSQLListener {
// If the pg port is split, it will be opened above. Otherwise,
Expand Down Expand Up @@ -124,21 +135,27 @@ func startListenRPCAndSQL(
// TODO(bdarnell): Do we need to also close the other listeners?
netutil.FatalIfUnexpected(anyL.Close())
netutil.FatalIfUnexpected(loopbackL.Close())
netutil.FatalIfUnexpected(ln.Close())
}
stopper.AddCloser(stop.CloserFn(func() {

stopGRPC := func() {
grpc.Stop()
serveOnMux.Do(func() {
// The cmux matches don't shut down properly unless serve is called on the
// cmux at some point. Use serveOnMux to ensure it's called during shutdown
// if we wouldn't otherwise reach the point where we start serving on it.
netutil.FatalIfUnexpected(m.Serve())
})
}))
}

if err := stopper.RunAsyncTask(
workersCtx, "grpc-quiesce", waitForQuiesce,
); err != nil {
waitForQuiesce(ctx)
stopGRPC()
return nil, nil, nil, err
}
stopper.AddCloser(stop.CloserFn(stopGRPC))

// startRPCServer starts the RPC server. We do not do this
// immediately because we want the cluster to be ready (or ready to
Expand Down

0 comments on commit 91bdcdd

Please sign in to comment.