Skip to content

Commit

Permalink
server: gracefully shut down secondary tenant servers
Browse files Browse the repository at this point in the history
This change ensures that tenant servers managed by the server
controller receive a graceful drain request as part of the graceful
drain process of the surrounding KV node.

This change, in turn, ensures that SQL clients connected to these
secondary tenant servers benefit from the same guarantees (and
graceful periods) as clients to the system tenant.

Release note: None
  • Loading branch information
knz committed Apr 6, 2023
1 parent 1491929 commit 4d4c111
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 44 deletions.
34 changes: 34 additions & 0 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,37 @@ func TestServerControllerLoginLogout(t *testing.T) {
require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames)
require.ElementsMatch(t, []string{"", ""}, cookieValues)
}

func TestServiceShutdownUsesGracefulDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
})
defer s.Stopper().Stop(ctx)

drainCh := make(chan struct{})

// Start a shared process server.
_, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantName: "hello",
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
RequireGracefulDrain: true,
DrainReportCh: drainCh,
},
},
})
require.NoError(t, err)

_, err = db.Exec("ALTER TENANT hello STOP SERVICE")
require.NoError(t, err)

// Wait for the server to shut down. This also asserts that the
// graceful drain has occurred.
<-drainCh
}
41 changes: 7 additions & 34 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,40 +1121,13 @@ func startShutdownAsync(
drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil)

if shouldDrain {
// Perform a graceful drain. We keep retrying forever, in
// case there are many range leases or some unavailability
// preventing progress. If the operator wants to expedite
// the shutdown, they will need to make it ungraceful
// via a 2nd signal.
var (
remaining = uint64(math.MaxUint64)
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

for ; ; prevRemaining = remaining {
var err error
remaining, _, err = s.Drain(drainCtx, verbose)
if err != nil {
log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err)
break
}
if remaining == 0 {
// No more work to do.
break
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)
}
// Perform a graceful drain. This function keeps retrying and
// the call might never complete (e.g. due to some
// unavailability preventing progress). This is intentional. If
// the operator wants to expedite the shutdown, they will need
// to make it ungraceful by sending a second signal to the
// process, which will tickle the shortcut in waitForShutdown().
server.CallDrainServerSide(drainCtx, s.Drain)
}

stopper.Stop(drainCtx)
Expand Down
89 changes: 88 additions & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package server
import (
"context"
"io"
"math"
"strings"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -103,6 +105,7 @@ type drainServer struct {
grpc *grpcServer
sqlServer *SQLServer
drainSleepFn func(time.Duration)
serverCtl *serverController

kvServer struct {
nodeLiveness *liveness.NodeLiveness
Expand Down Expand Up @@ -306,6 +309,26 @@ func (s *drainServer) runDrain(
func (s *drainServer) drainInner(
ctx context.Context, reporter func(int, redact.SafeString), verbose bool,
) (err error) {
if s.serverCtl != nil {
// We are on a KV node, with a server controller.
//
// First tell the controller to stop starting new servers.
s.serverCtl.draining.Set(true)

// Then shut down tenant servers orchestrated from
// this node.
stillRunning := s.serverCtl.drain(ctx)
reporter(stillRunning, "tenant servers")
// If we still have tenant servers, we can't make progress on
// draining SQL clients (on the system tenant) and the KV node,
// because that would block the graceful drain of the tenant
// server(s).
if stillRunning > 0 {
return nil
}
log.Infof(ctx, "all tenant servers stopped")
}

// Drain the SQL layer.
// Drains all SQL connections, distributed SQL execution flows, and SQL table leases.
if err = s.drainClients(ctx, reporter); err != nil {
Expand Down Expand Up @@ -399,7 +422,8 @@ func (s *drainServer) drainClients(
s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx)

// Drain all SQL table leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
// given sessions a chance to finish ongoing work and after the background
// tasks that may issue SQL statements have shut down.
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Mark this phase in the logs to clarify the context of any subsequent
Expand Down Expand Up @@ -433,6 +457,7 @@ func (s *drainServer) drainNode(
// No KV subsystem. Nothing to do.
return nil
}

// Set the node's liveness status to "draining".
if err = s.kvServer.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil {
return err
Expand All @@ -458,3 +483,65 @@ func (s *drainServer) logOpenConns(ctx context.Context) error {
}
})
}

// CallDrainServerSide is a reference implementation for a server-side
// function that wishes to shut down a server gracefully via the Drain
// interface. The Drain interface is responsible for notifying clients
// and shutting down systems in a particular order that prevents
// client app disruptions. We generally prefer graceful drains to the
// disorderly shutdown caused by either a process crash or a direct
// call to the stopper's Stop() method.
//
// By default, this code will wait forever for a graceful drain to
// complete. The caller can override this behavior by passing a context
// with a deadline.
//
// For an example client-side implementation (drain client over RPC),
// see the code in pkg/cli/node.go, doDrain().
func CallDrainServerSide(ctx context.Context, drainFn ServerSideDrainFn) {
var (
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

ctx = logtags.AddTag(ctx, "call-graceful-drain", nil)
for {
// Let the caller interrupt the process via context cancellation
// if so desired.
select {
case <-ctx.Done():
log.Ops.Errorf(ctx, "drain interrupted by caller: %v", ctx.Err())
return
default:
}

remaining, _, err := drainFn(ctx, verbose)
if err != nil {
log.Ops.Errorf(ctx, "graceful drain failed: %v", err)
return
}
if remaining == 0 {
// No more work to do.
log.Ops.Infof(ctx, "graceful drain complete")
return
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)

// Remember the remaining work to set the verbose flag in the next
// iteration.
prevRemaining = remaining
}
}

// ServerSideDrainFn is the interface of the server-side handler for the Drain logic.
type ServerSideDrainFn func(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
systemTenantNameContainer,
pgPreServer.SendRoutingError,
)
drain.serverCtl = sc

// Create the debug API server.
debugServer := debug.NewServer(
Expand Down
22 changes: 22 additions & 0 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

// onDemandServer represents a server that can be started on demand.
Expand Down Expand Up @@ -65,6 +66,9 @@ type onDemandServer interface {

// shutdownRequested returns the shutdown request channel.
shutdownRequested() <-chan ShutdownRequest

// gracefulDrain drains the server.
gracefulDrain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
}

type serverEntry struct {
Expand Down Expand Up @@ -113,6 +117,10 @@ type serverController struct {
// a tenant routing error to the incoming client.
sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName)

// draining is set when the surrounding server starts draining, and
// prevents further creation of new tenant servers.
draining syncutil.AtomicBool

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -240,6 +248,13 @@ func (s *tenantServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return s.server.sqlServer.ShutdownRequested()
}

func (s *tenantServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
ctx = s.server.AnnotateCtx(ctx)
return s.server.Drain(ctx, verbose)
}

// systemServerWrapper implements the onDemandServer interface for Server.
//
// (We can imagine a future where the SQL service for the system
Expand Down Expand Up @@ -297,3 +312,10 @@ func (s *systemServerWrapper) getInstanceID() base.SQLInstanceID {
func (s *systemServerWrapper) shutdownRequested() <-chan ShutdownRequest {
return nil
}

func (s *systemServerWrapper) gracefulDrain(
ctx context.Context, verbose bool,
) (uint64, redact.RedactableString, error) {
// The controller is not responsible for draining the system tenant.
return 0, "", nil
}
Loading

0 comments on commit 4d4c111

Please sign in to comment.