Skip to content

Commit

Permalink
server: gracefully shut down secondary tenant servers
Browse files Browse the repository at this point in the history
This change ensures that tenant servers managed by the server
controller receive a graceful drain request as part of the graceful
drain process of the surrounding KV node.

This change, in turn, ensures that SQL clients connected to these
secondary tenant servers benefit from the same guarantees (and
graceful periods) as clients to the system tenant.

Release note: None
  • Loading branch information
knz committed Apr 5, 2023
1 parent dc5eb63 commit 19c93cd
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 43 deletions.
41 changes: 7 additions & 34 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,40 +1121,13 @@ func startShutdownAsync(
drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil)

if shouldDrain {
// Perform a graceful drain. We keep retrying forever, in
// case there are many range leases or some unavailability
// preventing progress. If the operator wants to expedite
// the shutdown, they will need to make it ungraceful
// via a 2nd signal.
var (
remaining = uint64(math.MaxUint64)
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

for ; ; prevRemaining = remaining {
var err error
remaining, _, err = s.Drain(drainCtx, verbose)
if err != nil {
log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err)
break
}
if remaining == 0 {
// No more work to do.
break
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)
}
// Perform a graceful drain. This function keeps retrying and
// the call might never complete (e.g. due to some
// unavailability preventing progress). This is intentional. If
// the operator wants to expedite the shutdown, they will need
// to make it ungraceful by sending a second signal to the
// process, which will tickle the shortcut in waitForShutdown().
server.CallDrainServerSide(drainCtx, s.Drain)
}

stopper.Stop(drainCtx)
Expand Down
89 changes: 88 additions & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package server
import (
"context"
"io"
"math"
"strings"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -103,6 +105,7 @@ type drainServer struct {
grpc *grpcServer
sqlServer *SQLServer
drainSleepFn func(time.Duration)
serverCtl *serverController

kvServer struct {
nodeLiveness *liveness.NodeLiveness
Expand Down Expand Up @@ -306,6 +309,26 @@ func (s *drainServer) runDrain(
func (s *drainServer) drainInner(
ctx context.Context, reporter func(int, redact.SafeString), verbose bool,
) (err error) {
if s.serverCtl != nil {
// We are on a KV node, with a server controller.
//
// First tell the controller to stop starting new servers.
s.serverCtl.draining.Set(true)

// Then shut down tenant servers orchestrated from
// this node.
stillRunning := s.serverCtl.drain(ctx)
reporter(stillRunning, "tenant servers")
// If we still have tenant servers, we can't make progress on
// draining SQL clients (on the system tenant) and the KV node,
// because that would block the graceful drain of the tenant
// server(s).
if stillRunning > 0 {
return nil
}
log.Infof(ctx, "all tenant servers stopped")
}

// Drain the SQL layer.
// Drains all SQL connections, distributed SQL execution flows, and SQL table leases.
if err = s.drainClients(ctx, reporter); err != nil {
Expand Down Expand Up @@ -394,7 +417,8 @@ func (s *drainServer) drainClients(
s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx)

// Drain all SQL table leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
// given sessions a chance to finish ongoing work and after the background
// tasks that may issue SQL statements have shut down.
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Mark this phase in the logs to clarify the context of any subsequent
Expand Down Expand Up @@ -428,6 +452,7 @@ func (s *drainServer) drainNode(
// No KV subsystem. Nothing to do.
return nil
}

// Set the node's liveness status to "draining".
if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil {
return err
Expand All @@ -453,3 +478,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error {
}
})
}

// CallDrainServerSide is a reference implementation for a server-side
// function that wishes to shut down a server gracefully via the Drain
// interface. The Drain interface is responsible for notifying clients
// and shutting down systems in a particular order that prevents
// client app disruptions. We generally prefer graceful drains to the
// disorderly shutdown caused by either a process crash or a direct
// call to the stopper's Stop() method.
//
// By default, this code will wait forever for a graceful drain to
// complete. The caller can override this behavior by passing a context
// with a deadline.
//
// For an example client-side implementation (drain client over RPC),
// see the code in pkg/cli/node.go, doDrain().
func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) {
var (
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

ctx = logtags.AddTag(ctx, "call-graceful-drain", nil)
for {
// Let the caller interrupt the process via context cancellation
// if so desired.
select {
case <-ctx.Done():
log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err())
return
default:
}

remaining, _, err := drainFn(ctx, verbose)
if err != nil {
log.Ops.Errorf(ctx, "graceful drain failed: %v", err)
return
}
if remaining == 0 {
// No more work to do.
log.Ops.Infof(ctx, "graceful drain complete")
return
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)

// Remember the remaining work to set the verbose flag in the next
// iteration.
prevRemaining = remaining
}
}

// ServerSideDrainFn is the interface of the server-side handler for the Drain logic.
type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
systemTenantNameContainer,
pgPreServer.SendRoutingError,
)
drain.serverCtl = sc

// Create the debug API server.
debugServer := debug.NewServer(
Expand Down
22 changes: 22 additions & 0 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

// onDemandServer represents a server that can be started on demand.
Expand Down Expand Up @@ -65,6 +66,9 @@ type onDemandServer interface {

// shutdownRequested returns the shutdown request channel.
shutdownRequested() <-chan ShutdownRequest

// gracefulDrain drains the server.
gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
}

type serverEntry struct {
Expand Down Expand Up @@ -113,6 +117,10 @@ type serverController struct {
// a tenant routing error to the incoming client.
sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName)

// draining is set when the surrounding server starts draining, and
// prevents further creation of new tenant servers.
draining syncutil.AtomicBool

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -240,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return s.server.sqlServer.ShutdownRequested()
}

func (s *tenantServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
ctx = s.server.AnnotateCtx(ctx)
return s.server.Drain(ctx, verbose)
}

// systemServerWrapper implements the onDemandServer interface for Server.
//
// (We can imagine a future where the SQL service for the system
Expand Down Expand Up @@ -297,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID {
func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return nil
}

func (s *systemServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
// The controller is not responsible for draining the system tenant.
return 0, "", nil
}
80 changes: 72 additions & 8 deletions pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error {
select {
case <-time.After(watchInterval):
case <-c.stopper.ShouldQuiesce():
// Expedited server shutdown of outer server.
return
}
if c.draining.Get() {
// The outer server has started a graceful drain: stop
// picking up new servers.
return
}

if err := c.scanTenantsForRunnableServices(ctx, ie); err != nil {
log.Warningf(ctx, "cannot update running tenant services: %v", err)
}
Expand Down Expand Up @@ -179,6 +184,9 @@ func (c *serverController) scanTenantsForRunnableServices(
func (c *serverController) createServerEntryLocked(
ctx context.Context, tenantName roachpb.TenantName,
) (*serverEntry, error) {
if c.draining.Get() {
return nil, errors.New("server is draining")
}
entry, err := c.startControlledServer(ctx, tenantName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -216,12 +224,17 @@ func (c *serverController) startControlledServer(
// tracer attached to the incoming context.
tenantCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
tenantCtx = logtags.AddTag(tenantCtx, "tenant-orchestration", nil)
tenantCtx = logtags.AddTag(tenantCtx, "tenant", tenantName)

// ctlStopper is a stopper uniquely responsible for the control
// loop. It is separate from the tenantStopper defined below so
// that we can retry the server instantiation if it fails.
ctlStopper := stop.NewStopper()

// useGracefulDrainDuringTenantShutdown defined whether a graceful
// drain is requested on the tenant server by orchestration.
useGracefulDrainDuringTenantShutdown := make(chan bool, 1)

// Ensure that if the surrounding server requests shutdown, we
// propagate it to the new server.
if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) {
Expand All @@ -230,18 +243,30 @@ func (c *serverController) startControlledServer(
// Server control loop is terminating prematurely before a
// request was made to terminate it.
log.Infof(ctx, "tenant %q terminating", tenantName)

case <-c.stopper.ShouldQuiesce():
// Surrounding server is stopping; propagate the stop to the
// control goroutine below.
// Note: we can't do a graceful drain in that case because
// the RPC service in the surrounding server may already
// be unavailable.
log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName)
useGracefulDrainDuringTenantShutdown <- false
ctlStopper.Stop(tenantCtx)

case <-stopRequestCh:
// Someone requested a shutdown.
// Someone requested a graceful shutdown.
log.Infof(ctx, "received request for tenant %q to terminate", tenantName)
useGracefulDrainDuringTenantShutdown <- true
ctlStopper.Stop(tenantCtx)

case <-topCtx.Done():
// Someone requested a shutdown.
// Someone requested a shutdown - probably a test.
// Note: we can't do a graceful drain in that case because
// the RPC service in the surrounding server may already
// be unavailable.
log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName)
useGracefulDrainDuringTenantShutdown <- false
ctlStopper.Stop(tenantCtx)
}
}); err != nil {
Expand Down Expand Up @@ -311,6 +336,25 @@ func (c *serverController) startControlledServer(
case <-tenantStopper.ShouldQuiesce():
return
case <-ctlStopper.ShouldQuiesce():
select {
case gracefulDrainRequested := <-useGracefulDrainDuringTenantShutdown:
if gracefulDrainRequested {
// Ensure that the graceful drain for the tenant serer aborts
// early if the Stopper for the surrounding server is
// prematurely shutting down. This is because once the surrounding node
// starts quiescing tasks, it won't be able to process KV requests
// by the tenant server any more.
drainCtx, cancel := c.stopper.WithCancelOnQuiesce(tenantCtx)
defer cancel()
log.Infof(drainCtx, "starting graceful drain")
// Call the drain service on that tenant's server. This may take a
// while as it needs to wait for clients to disconnect and SQL
// activity to clear up, possibly waiting for various configurable
// timeouts.
CallDrainServerSide(drainCtx, tenantServer.gracefulDrain)
}
default:
}
tenantStopper.Stop(ctx)
case <-c.stopper.ShouldQuiesce():
tenantStopper.Stop(ctx)
Expand Down Expand Up @@ -473,6 +517,30 @@ func (c *serverController) newServerInternal(

// Close implements the stop.Closer interface.
func (c *serverController) Close() {
entries := c.requestStopAll()

// Wait for shutdown for all servers.
for _, e := range entries {
<-e.state.stopped
}
}

func (c *serverController) drain(ctx context.Context) (stillRunning int) {
entries := c.requestStopAll()
// How many entries are _not_ stopped yet?
notStopped := 0
for _, e := range entries {
select {
case <-e.state.stopped:
default:
log.Infof(ctx, "server for tenant %q still running", e.nameContainer)
notStopped++
}
}
return notStopped
}

func (c *serverController) requestStopAll() []*serverEntry {
entries := func() (res []*serverEntry) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -487,11 +555,7 @@ func (c *serverController) Close() {
for _, e := range entries {
e.state.requestStop()
}

// Wait for shutdown for all servers.
for _, e := range entries {
<-e.state.stopped
}
return entries
}

type nodeEventLogger interface {
Expand Down

0 comments on commit 19c93cd

Please sign in to comment.