Skip to content

Commit

Permalink
server: gracefully shut down secondary tenant servers
Browse files Browse the repository at this point in the history
This change ensures that tenant servers managed by the server
controller receive a graceful drain request as part of the graceful
drain process of the surrounding KV node.

This change, in turn, ensures that SQL clients connected to these
secondary tenant servers benefit from the same guarantees (and
graceful periods) as clients to the system tenant.

Release note: None
  • Loading branch information
knz committed Mar 29, 2023
1 parent 5b62394 commit 75a863c
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 9 deletions.
19 changes: 19 additions & 0 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type drainServer struct {
grpc *grpcServer
sqlServer *SQLServer
drainSleepFn func(time.Duration)
serverCtl *serverController

kvServer struct {
nodeLiveness *liveness.NodeLiveness
Expand Down Expand Up @@ -306,6 +307,23 @@ func (s *drainServer) runDrain(
func (s *drainServer) drainInner(
ctx context.Context, reporter func(int, redact.SafeString), verbose bool,
) (err error) {
if s.serverCtl != nil {
// We are on a KV node, with a server controller.
//
// So we need to first shut down tenant servers orchestrated from
// this node.
stillRunning := s.serverCtl.drain(ctx)
reporter(stillRunning, "tenant servers")
// If we still have tenant servers, we can't make progress on
// draining SQL clients (on the system tenant) and the KV node,
// because that would block the graceful drain of the tenant
// server(s).
if stillRunning > 0 {
return nil
}
log.Infof(ctx, "all tenant servers stopped")
}

// Drain the SQL layer.
// Drains all SQL connections, distributed SQL execution flows, and SQL table leases.
if err = s.drainClients(ctx, reporter); err != nil {
Expand Down Expand Up @@ -397,6 +415,7 @@ func (s *drainServer) drainNode(
// No KV subsystem. Nothing to do.
return nil
}

// Set the node's liveness status to "draining".
if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
systemTenantNameContainer,
pgPreServer.SendRoutingError,
)
drain.serverCtl = sc

// Create the debug API server.
debugServer := debug.NewServer(
Expand Down
18 changes: 18 additions & 0 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

// onDemandServer represents a server that can be started on demand.
Expand Down Expand Up @@ -54,6 +55,9 @@ type onDemandServer interface {

// shutdownRequested returns the shutdown request channel.
shutdownRequested() <-chan ShutdownRequest

// gracefulDrain drains the server.
gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
}

type serverEntry struct {
Expand Down Expand Up @@ -212,6 +216,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return s.server.sqlServer.ShutdownRequested()
}

func (s *tenantServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
ctx = s.server.AnnotateCtx(ctx)
return s.server.Drain(ctx, verbose)
}

// systemServerWrapper implements the onDemandServer interface for Server.
//
// (We can imagine a future where the SQL service for the system
Expand Down Expand Up @@ -255,3 +266,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID {
func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return nil
}

func (s *systemServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
// The controller is not responsible for draining the system tenant.
return 0, "", nil
}
117 changes: 108 additions & 9 deletions pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package server

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -212,6 +213,28 @@ func (c *serverController) startControlledServer(

tenantStopper := stop.NewStopper()

var shutdownInterface struct {
syncutil.Mutex
drainableServer interface {
gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
}
}
gracefulDrain := func(ctx context.Context) {
drainServer := func() interface {
gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
} {
shutdownInterface.Lock()
defer shutdownInterface.Unlock()
return shutdownInterface.drainableServer
}()
if drainServer == nil {
// Server not started yet. No graceful drain possible.
return
}
log.Infof(ctx, "gracefully draining tenant %q", tenantName)
doGracefulDrain(ctx, drainServer.gracefulDrain)
}

// Ensure that if the surrounding server requests shutdown, we
// propagate it to the new server.
if err := c.stopper.RunAsyncTask(ctx, "propagate-close", func(ctx context.Context) {
Expand All @@ -222,21 +245,28 @@ func (c *serverController) startControlledServer(
case <-c.stopper.ShouldQuiesce():
// Surrounding server is stopping; propagate the stop to the
// control goroutine below.
// Note: we can't do a graceful drain in that case because
// the RPC service in the surrounding server may already
// be unavailable.
log.Infof(ctx, "server terminating; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
case <-stopRequestCh:
// Someone requested a shutdown.
// Someone requested a graceful shutdown.
log.Infof(ctx, "received request for tenant %q to terminate", tenantName)
gracefulDrain(tenantCtx)
tenantStopper.Stop(tenantCtx)
case <-topCtx.Done():
// Someone requested a shutdown.
// Someone requested a shutdown - probably a test.
// Note: we can't do a graceful drain in that case because
// the RPC service in the surrounding server may already
// be unavailable.
log.Infof(ctx, "startup context cancelled; telling tenant %q to terminate", tenantName)
tenantStopper.Stop(tenantCtx)
}
}); err != nil {
// The goroutine above is responsible for stopping the
// tenantStopper. If it fails to stop, we stop it here
// to avoid leaking the stopper.
// tenantStopper. If it fails to start, we need to avoid leaking
// the stopper.
tenantStopper.Stop(ctx)
return nil, err
}
Expand Down Expand Up @@ -314,6 +344,11 @@ func (c *serverController) startControlledServer(
}))

// Indicate the server has started.
func() {
shutdownInterface.Lock()
defer shutdownInterface.Unlock()
shutdownInterface.drainableServer = s
}()
entry.server = s
startedOrStoppedChAlreadyClosed = true
entry.state.started.Set(true)
Expand Down Expand Up @@ -418,6 +453,30 @@ func (c *serverController) startServerInternal(

// Close implements the stop.Closer interface.
func (c *serverController) Close() {
entries := c.requestStopAll()

// Wait for shutdown for all servers.
for _, e := range entries {
<-e.state.stopped
}
}

func (c *serverController) drain(ctx context.Context) (stillRunning int) {
entries := c.requestStopAll()
// How many entries are _not_ stopped yet?
notStopped := 0
for _, e := range entries {
select {
case <-e.state.stopped:
default:
log.Infof(ctx, "server for tenant %q still running", e.nameContainer)
notStopped++
}
}
return notStopped
}

func (c *serverController) requestStopAll() []*serverEntry {
entries := func() (res []*serverEntry) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -432,11 +491,7 @@ func (c *serverController) Close() {
for _, e := range entries {
e.state.requestStop()
}

// Wait for shutdown for all servers.
for _, e := range entries {
<-e.state.stopped
}
return entries
}

type nodeEventLogger interface {
Expand Down Expand Up @@ -483,3 +538,47 @@ func (c *serverController) logStopEvent(

c.logger.logStructuredEvent(ctx, ev)
}

// doGracefulDrain performs a graceful drain.
//
// FIXME(knz): This is the same code as in cli/start.go,
// startShutdownAsync. Maybe we could have a single function used by
// both.
func doGracefulDrain(
ctx context.Context,
drainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error),
) {
// Perform a graceful drain.
// FIXME(knz): This code currently keeps retrying forever. Is this reasonable?
var (
remaining = uint64(math.MaxUint64)
prevRemaining = uint64(math.MaxUint64)
verbose = true // false
)

drainCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))

for ; ; prevRemaining = remaining {
var err error
remaining, _, err = drainFn(drainCtx, verbose)
if err != nil {
log.Ops.Errorf(ctx, "graceful drain failed: %v", err)
break
}
if remaining == 0 {
// No more work to do.
break
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)
}
}

0 comments on commit 75a863c

Please sign in to comment.