Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs,server: graceful shutdown for secondary tenant servers #99958

Merged
merged 10 commits into from
Apr 6, 2023
2 changes: 1 addition & 1 deletion pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"//pkg/ccl/kvccl",
"//pkg/ccl/utilccl/licenseccl",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/multitenant/tenantcapabilities",
Expand Down Expand Up @@ -89,7 +90,6 @@ go_test(
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
63 changes: 59 additions & 4 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -387,34 +388,54 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) {

ctx := context.Background()

t.Logf("starting test cluster")
numNodes := 3
tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DefaultTestTenant: base.TestTenantDisabled,
}})
defer tc.Stopper().Stop(ctx)
defer func() {
t.Logf("stopping test cluster")
tc.Stopper().Stop(ctx)
}()

t.Logf("starting tenant servers")
db := tc.ServerConn(0)
_, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED")
require.NoError(t, err)

// Pick a random node, try to run some SQL inside that tenant.
rng, _ := randutil.NewTestRand()
sqlAddr := tc.Server(int(rng.Int31n(int32(numNodes)))).ServingSQLAddr()
serverIdx := int(rng.Int31n(int32(numNodes)))
sqlAddr := tc.Server(serverIdx).ServingSQLAddr()
t.Logf("attempting to use tenant server on node %d (%s)", serverIdx, sqlAddr)
testutils.SucceedsSoon(t, func() error {
tenantDB, err := serverutils.OpenDBConnE(sqlAddr, "cluster:hello", false, tc.Stopper())
if err != nil {
t.Logf("error connecting to tenant server (will retry): %v", err)
return err
}
defer tenantDB.Close()
if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil {
if err := tenantDB.Ping(); err != nil {
t.Logf("connection not ready (will retry): %v", err)
return err
}
if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil {
// This is not retryable -- if the server accepts the
// connection, it better be ready.
t.Fatal(err)
}
if _, err := tenantDB.Exec("GRANT ADMIN TO foo"); err != nil {
return err
// This is not retryable -- if the server accepts the
// connection, it better be ready.
t.Fatal(err)
}
return nil
})
t.Logf("tenant server on node %d (%s) is ready", serverIdx, sqlAddr)
}

func TestServerStartStop(t *testing.T) {
Expand Down Expand Up @@ -545,3 +566,37 @@ func TestServerControllerLoginLogout(t *testing.T) {
require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames)
require.ElementsMatch(t, []string{"", ""}, cookieValues)
}

func TestServiceShutdownUsesGracefulDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
})
defer s.Stopper().Stop(ctx)

drainCh := make(chan struct{})

// Start a shared process server.
_, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantName: "hello",
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
RequireGracefulDrain: true,
DrainReportCh: drainCh,
},
},
})
require.NoError(t, err)

_, err = db.Exec("ALTER TENANT hello STOP SERVICE")
require.NoError(t, err)

// Wait for the server to shut down. This also asserts that the
// graceful drain has occurred.
<-drainCh
}
102 changes: 46 additions & 56 deletions pkg/ccl/serverccl/server_startup_guardrails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestServerStartupGuardrails ensures that a SQL server will fail to start if
Expand Down Expand Up @@ -74,69 +74,59 @@ func TestServerStartupGuardrails(t *testing.T) {
}

for i, test := range tests {
storageSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.storageBinaryVersion,
test.storageBinaryMinSupportedVersion,
false, /* initializeVersion */
)
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
defer log.Scope(t).Close(t)

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Disable the default test tenant, since we create one explicitly
// below.
DefaultTestTenant: base.TestTenantDisabled,
Settings: storageSettings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.storageBinaryVersion,
BootstrapVersionKeyOverride: clusterversion.V22_2,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
SQLEvalContext: &eval.TestingKnobs{
TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey,
},
},
})
storageSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.storageBinaryVersion,
test.storageBinaryMinSupportedVersion,
false, /* initializeVersion */
)

tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.tenantBinaryVersion,
test.tenantBinaryMinSupportedVersion,
true, /* initializeVersion */
)

// The tenant will be created with an active version equal to the version
// corresponding to TenantLogicalVersionKey. Tenant creation is expected
// to succeed for all test cases but server creation is expected to succeed
// only if tenantBinaryVersion is at least equal to the version corresponding
// to TenantLogicalVersionKey.
stopper := stop.NewStopper()
tenantServer, err := s.StartTenant(context.Background(),
base.TestTenantArgs{
Settings: tenantSettings,
TenantID: serverutils.TestTenantID(),
Stopper: stopper,
TestingKnobs: base.TestingKnobs{
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Disable the default test tenant, since we create one explicitly
// below.
DefaultTestTenant: base.TestTenantDisabled,
Settings: storageSettings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.tenantBinaryVersion,
BinaryVersionOverride: test.storageBinaryVersion,
BootstrapVersionKeyOverride: clusterversion.V22_2,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
SQLEvalContext: &eval.TestingKnobs{
TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey,
},
},
})
defer s.Stopper().Stop(context.Background())

if !testutils.IsError(err, test.expErrMatch) {
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch)
}
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.tenantBinaryVersion,
test.tenantBinaryMinSupportedVersion,
true, /* initializeVersion */
)

// Only attempt to stop the tenant if it was started successfully.
if err == nil {
tenantServer.Stopper().Stop(context.Background())
} else {
// Test - stop the failed SQL server using a custom stopper
// NOTE: This custom stopper should not be required, but is because
// currently, if a SQL server fails to start it will not be cleaned
// up immediately without invoking the custom stopper. This could
// be a problem, and is tracked with #98868.
stopper.Stop(context.Background())
}
s.Stopper().Stop(context.Background())
// The tenant will be created with an active version equal to the version
// corresponding to TenantLogicalVersionKey. Tenant creation is expected
// to succeed for all test cases but server creation is expected to succeed
// only if tenantBinaryVersion is at least equal to the version corresponding
// to TenantLogicalVersionKey.
_, err := s.StartTenant(context.Background(),
base.TestTenantArgs{
Settings: tenantSettings,
TenantID: serverutils.TestTenantID(),
TestingKnobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.tenantBinaryVersion,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
},
})

if !testutils.IsError(err, test.expErrMatch) {
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch)
}
})
}
}
41 changes: 7 additions & 34 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,40 +1121,13 @@ func startShutdownAsync(
drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil)

if shouldDrain {
// Perform a graceful drain. We keep retrying forever, in
// case there are many range leases or some unavailability
// preventing progress. If the operator wants to expedite
// the shutdown, they will need to make it ungraceful
// via a 2nd signal.
var (
remaining = uint64(math.MaxUint64)
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

for ; ; prevRemaining = remaining {
var err error
remaining, _, err = s.Drain(drainCtx, verbose)
if err != nil {
log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err)
break
}
if remaining == 0 {
// No more work to do.
break
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)
}
// Perform a graceful drain. This function keeps retrying and
// the call might never complete (e.g. due to some
// unavailability preventing progress). This is intentional. If
// the operator wants to expedite the shutdown, they will need
// to make it ungraceful by sending a second signal to the
// process, which will tickle the shortcut in waitForShutdown().
server.CallDrainServerSide(drainCtx, s.Drain)
}

stopper.Stop(drainCtx)
Expand Down
Loading