Skip to content

Commit

Permalink
Merge pull request #101089 from knz/20230410-backport-server-start
Browse files Browse the repository at this point in the history
  • Loading branch information
knz authored Apr 13, 2023
2 parents 77df830 + 6fc92d1 commit e9482d7
Show file tree
Hide file tree
Showing 20 changed files with 1,103 additions and 422 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/ccl/kvccl",
"//pkg/ccl/utilccl/licenseccl",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/multitenant/tenantcapabilities",
Expand Down Expand Up @@ -88,7 +89,6 @@ go_test(
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
95 changes: 91 additions & 4 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -414,40 +415,62 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) {

ctx := context.Background()

t.Logf("starting test cluster")
numNodes := 3
tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DisableDefaultTestTenant: true,
}})
defer tc.Stopper().Stop(ctx)
defer func() {
t.Logf("stopping test cluster")
tc.Stopper().Stop(ctx)
}()

t.Logf("starting tenant servers")
db := tc.ServerConn(0)
_, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED")
require.NoError(t, err)

// Pick a random node, try to run some SQL inside that tenant.
rng, _ := randutil.NewTestRand()
sqlAddr := tc.Server(int(rng.Int31n(int32(numNodes)))).ServingSQLAddr()
serverIdx := int(rng.Int31n(int32(numNodes)))
sqlAddr := tc.Server(serverIdx).ServingSQLAddr()
t.Logf("attempting to use tenant server on node %d (%s)", serverIdx, sqlAddr)
testutils.SucceedsSoon(t, func() error {
tenantDB, err := serverutils.OpenDBConnE(sqlAddr, "cluster:hello", false, tc.Stopper())
if err != nil {
t.Logf("error connecting to tenant server (will retry): %v", err)
return err
}
defer tenantDB.Close()
if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil {
if err := tenantDB.Ping(); err != nil {
t.Logf("connection not ready (will retry): %v", err)
return err
}
if _, err := tenantDB.Exec("CREATE ROLE foo"); err != nil {
// This is not retryable -- if the server accepts the
// connection, it better be ready.
t.Fatal(err)
}
if _, err := tenantDB.Exec("GRANT ADMIN TO foo"); err != nil {
return err
// This is not retryable -- if the server accepts the
// connection, it better be ready.
t.Fatal(err)
}
return nil
})
t.Logf("tenant server on node %d (%s) is ready", serverIdx, sqlAddr)
}

func TestServerStartStop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "test sensitive to low timeout")

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Expand Down Expand Up @@ -496,6 +519,36 @@ func TestServerStartStop(t *testing.T) {
}
return errors.New("server still alive")
})

log.Infof(ctx, "end of test - test server will now shut down ungracefully")

// Monitor the state of the test server stopper. We use this logging
// to troubleshoot slow drains.
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-time.After(200 * time.Millisecond):
select {
case <-s.Stopper().ShouldQuiesce():
log.Infof(ctx, "test server is quiescing")
case <-s.Stopper().IsStopped():
log.Infof(ctx, "test server is stopped")
return
default:
}
}
}
}()
defer func() { close(done) }()

defer time.AfterFunc(10*time.Second, func() {
log.DumpStacks(ctx, "slow quiesce")
log.Fatalf(ctx, "test took too long to shut down")
}).Stop()
s.Stopper().Stop(ctx)
}

func TestServerControllerLoginLogout(t *testing.T) {
Expand Down Expand Up @@ -572,3 +625,37 @@ func TestServerControllerLoginLogout(t *testing.T) {
require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames)
require.ElementsMatch(t, []string{"", ""}, cookieValues)
}

func TestServiceShutdownUsesGracefulDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableDefaultTestTenant: true,
})
defer s.Stopper().Stop(ctx)

drainCh := make(chan struct{})

// Start a shared process server.
_, _, err := s.(*server.TestServer).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantName: "hello",
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
RequireGracefulDrain: true,
DrainReportCh: drainCh,
},
},
})
require.NoError(t, err)

_, err = db.Exec("ALTER TENANT hello STOP SERVICE")
require.NoError(t, err)

// Wait for the server to shut down. This also asserts that the
// graceful drain has occurred.
<-drainCh
}
102 changes: 46 additions & 56 deletions pkg/ccl/serverccl/server_startup_guardrails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestServerStartupGuardrails ensures that a SQL server will fail to start if
Expand Down Expand Up @@ -74,69 +74,59 @@ func TestServerStartupGuardrails(t *testing.T) {
}

for i, test := range tests {
storageSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.storageBinaryVersion,
test.storageBinaryMinSupportedVersion,
false, /* initializeVersion */
)
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
defer log.Scope(t).Close(t)

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Disable the default test tenant, since we create one explicitly
// below.
DisableDefaultTestTenant: true,
Settings: storageSettings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.storageBinaryVersion,
BootstrapVersionKeyOverride: clusterversion.V22_2,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
SQLEvalContext: &eval.TestingKnobs{
TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey,
},
},
})
storageSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.storageBinaryVersion,
test.storageBinaryMinSupportedVersion,
false, /* initializeVersion */
)

tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.tenantBinaryVersion,
test.tenantBinaryMinSupportedVersion,
true, /* initializeVersion */
)

// The tenant will be created with an active version equal to the version
// corresponding to TenantLogicalVersionKey. Tenant creation is expected
// to succeed for all test cases but server creation is expected to succeed
// only if tenantBinaryVersion is at least equal to the version corresponding
// to TenantLogicalVersionKey.
stopper := stop.NewStopper()
tenantServer, err := s.StartTenant(context.Background(),
base.TestTenantArgs{
Settings: tenantSettings,
TenantID: serverutils.TestTenantID(),
Stopper: stopper,
TestingKnobs: base.TestingKnobs{
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Disable the default test tenant, since we create one explicitly
// below.
DisableDefaultTestTenant: true,
Settings: storageSettings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.tenantBinaryVersion,
BinaryVersionOverride: test.storageBinaryVersion,
BootstrapVersionKeyOverride: clusterversion.V22_2,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
SQLEvalContext: &eval.TestingKnobs{
TenantLogicalVersionKeyOverride: test.TenantLogicalVersionKey,
},
},
})
defer s.Stopper().Stop(context.Background())

if !testutils.IsError(err, test.expErrMatch) {
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch)
}
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(
test.tenantBinaryVersion,
test.tenantBinaryMinSupportedVersion,
true, /* initializeVersion */
)

// Only attempt to stop the tenant if it was started successfully.
if err == nil {
tenantServer.Stopper().Stop(context.Background())
} else {
// Test - stop the failed SQL server using a custom stopper
// NOTE: This custom stopper should not be required, but is because
// currently, if a SQL server fails to start it will not be cleaned
// up immediately without invoking the custom stopper. This could
// be a problem, and is tracked with #98868.
stopper.Stop(context.Background())
}
s.Stopper().Stop(context.Background())
// The tenant will be created with an active version equal to the version
// corresponding to TenantLogicalVersionKey. Tenant creation is expected
// to succeed for all test cases but server creation is expected to succeed
// only if tenantBinaryVersion is at least equal to the version corresponding
// to TenantLogicalVersionKey.
_, err := s.StartTenant(context.Background(),
base.TestTenantArgs{
Settings: tenantSettings,
TenantID: serverutils.TestTenantID(),
TestingKnobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: test.tenantBinaryVersion,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
},
})

if !testutils.IsError(err, test.expErrMatch) {
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch)
}
})
}
}
41 changes: 7 additions & 34 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,40 +1121,13 @@ func startShutdownAsync(
drainCtx := logtags.AddTag(s.AnnotateCtx(context.Background()), "server drain process", nil)

if shouldDrain {
// Perform a graceful drain. We keep retrying forever, in
// case there are many range leases or some unavailability
// preventing progress. If the operator wants to expedite
// the shutdown, they will need to make it ungraceful
// via a 2nd signal.
var (
remaining = uint64(math.MaxUint64)
prevRemaining = uint64(math.MaxUint64)
verbose = false
)

for ; ; prevRemaining = remaining {
var err error
remaining, _, err = s.Drain(drainCtx, verbose)
if err != nil {
log.Ops.Errorf(drainCtx, "graceful drain failed: %v", err)
break
}
if remaining == 0 {
// No more work to do.
break
}

// If range lease transfer stalls or the number of
// remaining leases somehow increases, verbosity is set
// to help with troubleshooting.
if remaining >= prevRemaining {
verbose = true
}

// Avoid a busy wait with high CPU usage if the server replies
// with an incomplete drain too quickly.
time.Sleep(200 * time.Millisecond)
}
// Perform a graceful drain. This function keeps retrying and
// the call might never complete (e.g. due to some
// unavailability preventing progress). This is intentional. If
// the operator wants to expedite the shutdown, they will need
// to make it ungraceful by sending a second signal to the
// process, which will tickle the shortcut in waitForShutdown().
server.CallDrainServerSide(drainCtx, s.Drain)
}

stopper.Stop(drainCtx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ go_library(
"server.go",
"server_controller.go",
"server_controller_accessors.go",
"server_controller_channel_orchestrator.go",
"server_controller_http.go",
"server_controller_new_server.go",
"server_controller_orchestration.go",
"server_controller_orchestration_method.go",
"server_controller_sql.go",
"server_http.go",
"server_obs_service.go",
Expand Down
Loading

0 comments on commit e9482d7

Please sign in to comment.