Skip to content

Commit

Permalink
server: fix the tenant server error handling
Browse files Browse the repository at this point in the history
Prior to this patch, if an error occurred during the initialization or
startup of a secondary tenant server, the initialization would leak
state into the stopper defined for that tenant. Generally, reusing
a stopper across server startup failures is not safe (and API
violation).

This patch fixes it by decoupling the intermediate stopper used for
orchestration from the one used per tenant server.

Release note: None
  • Loading branch information
knz committed Apr 6, 2023
1 parent 649c02a commit 49f13fc
Showing 1 changed file with 68 additions and 29 deletions.
97 changes: 68 additions & 29 deletions pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,40 +210,45 @@ func (c *serverController) startControlledServer(
}

topCtx := ctx

// Use a different context for the tasks below, because the tenant
// stopper will have its own tracer which is incompatible with the
// tracer attached to the incoming context.
tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil)

tenantCtx := logtags.AddTag(context.Background(), "tenant-orchestration", nil)

tenantStopper := stop.NewStopper()
// ctlStopper is a stopper uniquely responsible for the control
// loop. It is separate from the tenantStopper defined below so
// that we can retry the server instantiation if it fails.
ctlStopper := stop.NewStopper()

// Ensure that if the surrounding server requests shutdown, we
// propagate it to the new server.
if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) {
select {
case <-tenantStopper.ShouldQuiesce():
// Tenant is terminating on their own; nothing else to do here.
case <-stoppedCh:
// Server control loop is terminating prematurely before a
// request was made to terminate it.
log.Infof(ctx, "tenant %q terminating", tenantName)
case <-c.stopper.ShouldQuiesce():
// Surrounding server is stopping; propagate the stop to the
// control goroutine below.
log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
ctlStopper.Stop(tenantCtx)
case <-stopRequestCh:
// Someone requested a shutdown.
log.Infof(ctx, "received request for tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
ctlStopper.Stop(tenantCtx)
case <-topCtx.Done():
// Someone requested a shutdown.
log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
ctlStopper.Stop(tenantCtx)
}
}); err != nil {
// The goroutine above is responsible for stopping the
// tenantStopper. If it fails to stop, we stop it here
// to avoid leaking the stopper.
tenantStopper.Stop(ctx)
// The goroutine above is responsible for stopping the ctlStopper.
// If it fails to stop, we stop it here to avoid leaking a
// stopper.
ctlStopper.Stop(ctx)
return nil, err
}

Expand All @@ -260,7 +265,7 @@ func (c *serverController) startControlledServer(
entry.state.started.Set(false)
close(stoppedCh)
if !startedOrStoppedChAlreadyClosed {
entry.state.startErr = errors.New("server stop before start")
entry.state.startErr = errors.New("server stop before successful start")
close(startedOrStoppedCh)
}

Expand All @@ -274,36 +279,69 @@ func (c *serverController) startControlledServer(
// RunAsyncTask, because this stopper will be assigned its own
// different tracer.
ctx := tenantCtx
// We want a context that gets cancelled when the tenant is
// We want a context that gets cancelled when the server is
// shutting down, for the possible few cases in
// newServerInternal/preStart/acceptClients which are not looking at the
// tenant.ShouldQuiesce() channel but are sensitive to context
// tenantStopper.ShouldQuiesce() channel but are sensitive to context
// cancellation.
var cancel func()
ctx, cancel = tenantStopper.WithCancelOnQuiesce(ctx)
ctx, cancel = ctlStopper.WithCancelOnQuiesce(ctx)
defer cancel()

// Stop retrying startup/initialization if we are being shut
// down early.
retryOpts := retry.Options{
Closer: tenantStopper.ShouldQuiesce(),
Closer: ctlStopper.ShouldQuiesce(),
}

var s onDemandServer
// tenantStopper is the stopper specific to one tenant server
// instance. We define a new tenantStopper on every attempt to
// instantiate the tenant server below. It is then linked to
// ctlStopper below once the instantiation and start have
// succeeded.
var tenantStopper *stop.Stopper

var tenantServer onDemandServer
for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); {
err := func() error {
var err error
s, err = c.newServerInternal(ctx, entry.nameContainer, tenantStopper)
tenantStopper = stop.NewStopper()

// Link the controller stopper to this tenant stopper.
if err := ctlStopper.RunAsyncTask(ctx, "propagate-close-tenant", func(ctx context.Context) {
select {
case <-tenantStopper.ShouldQuiesce():
return
case <-ctlStopper.ShouldQuiesce():
tenantStopper.Stop(ctx)
case <-c.stopper.ShouldQuiesce():
tenantStopper.Stop(ctx)
}
}); err != nil {
tenantStopper.Stop(ctx)
return
}

// Try to create the server.
s, err := func() (onDemandServer, error) {
s, err := c.newServerInternal(ctx, entry.nameContainer, tenantStopper)
if err != nil {
return err
return nil, errors.Wrap(err, "while creating server")
}
startCtx := s.annotateCtx(context.Background())

// Note: we make preStart() below derive from ctx, which is
// cancelled on shutdown of the outer server. This is necessary
// to ensure preStart() properly stops prematurely in that case.
startCtx := s.annotateCtx(ctx)
startCtx = logtags.AddTag(startCtx, "start-server", nil)
log.Infof(startCtx, "starting tenant server")
if err := s.preStart(startCtx); err != nil {
return err
return nil, errors.Wrap(err, "while starting server")
}
return s.acceptClients(startCtx)
return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients")
}()
if err != nil {
// Creation failed. We stop the tenant stopper here, which also
// takes care of terminating the async task we've just started above.
tenantStopper.Stop(ctx)
c.logStartEvent(ctx, roachpb.TenantID{}, 0,
entry.nameContainer.Get(), false /* success */, err)
log.Warningf(ctx,
Expand All @@ -312,9 +350,10 @@ func (c *serverController) startControlledServer(
entry.state.startErr = err
continue
}
tenantServer = s
break
}
if s == nil {
if tenantServer == nil {
// Retry loop exited before the server could start. This
// indicates that there was an async request to abandon the
// server startup. This is OK; just terminate early. The defer
Expand All @@ -323,14 +362,14 @@ func (c *serverController) startControlledServer(
}

// Log the start event and ensure the stop event is logged eventually.
tid, iid := s.getTenantID(), s.getInstanceID()
tid, iid := tenantServer.getTenantID(), tenantServer.getInstanceID()
c.logStartEvent(ctx, tid, iid, tenantName, true /* success */, nil)
tenantStopper.AddCloser(stop.CloserFn(func() {
c.logStopEvent(ctx, tid, iid, tenantName)
}))

// Indicate the server has started.
entry.server = s
entry.server = tenantServer
startedOrStoppedChAlreadyClosed = true
entry.state.started.Set(true)
close(startedOrStoppedCh)
Expand All @@ -340,7 +379,7 @@ func (c *serverController) startControlledServer(
case <-tenantStopper.ShouldQuiesce():
log.Infof(ctx, "tenant %q finishing their own control loop", tenantName)

case shutdownRequest := <-s.shutdownRequested():
case shutdownRequest := <-tenantServer.shutdownRequested():
log.Infof(ctx, "tenant %q requesting their own shutdown: %v",
tenantName, shutdownRequest.ShutdownCause())
// Make the async stop goroutine above pick up the task of shutting down.
Expand Down

0 comments on commit 49f13fc

Please sign in to comment.