Skip to content

Commit

Permalink
server: wait for all KV nodes to observe tenant stop request
Browse files Browse the repository at this point in the history
In order to make it possible for a caller to know that no new SQL
requests will be served by a tenant after a STOP command, this PR adds
a new STOPPING state. While nodes are in the STOPPING state, KV
requests will be rejected. We then wait for all nodes to observe the
STOPPING state before transitioning the cluster to NONE.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Feb 8, 2024
1 parent 049d54d commit e9c00be
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 29 deletions.
76 changes: 76 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,82 @@ Tier represents one level of the locality hierarchy.



## TenantServiceStatus

`GET /_status/tenant_service_status`

TenantServiceStatus returns the current service and data state of
the given tenant as known to the server orchestrator, which may
differ from the database state.

Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| node_id | [string](#cockroach.server.serverpb.TenantServiceStatusRequest-string) | | | [reserved](#support-status) |
| tenant_id | [uint64](#cockroach.server.serverpb.TenantServiceStatusRequest-uint64) | | | [reserved](#support-status) |







#### Response Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| status_by_node_id | [TenantServiceStatusResponse.StatusByNodeIdEntry](#cockroach.server.serverpb.TenantServiceStatusResponse-cockroach.server.serverpb.TenantServiceStatusResponse.StatusByNodeIdEntry) | repeated | | [reserved](#support-status) |
| errors_by_node_id | [TenantServiceStatusResponse.ErrorsByNodeIdEntry](#cockroach.server.serverpb.TenantServiceStatusResponse-cockroach.server.serverpb.TenantServiceStatusResponse.ErrorsByNodeIdEntry) | repeated | | [reserved](#support-status) |






<a name="cockroach.server.serverpb.TenantServiceStatusResponse-cockroach.server.serverpb.TenantServiceStatusResponse.StatusByNodeIdEntry"></a>
#### TenantServiceStatusResponse.StatusByNodeIdEntry



| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| key | [int32](#cockroach.server.serverpb.TenantServiceStatusResponse-int32) | | | |
| value | [cockroach.multitenant.SQLInfo](#cockroach.server.serverpb.TenantServiceStatusResponse-cockroach.multitenant.SQLInfo) | | | |





<a name="cockroach.server.serverpb.TenantServiceStatusResponse-cockroach.server.serverpb.TenantServiceStatusResponse.ErrorsByNodeIdEntry"></a>
#### TenantServiceStatusResponse.ErrorsByNodeIdEntry



| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| key | [int32](#cockroach.server.serverpb.TenantServiceStatusResponse-int32) | | | |
| value | [string](#cockroach.server.serverpb.TenantServiceStatusResponse-string) | | | |






## TenantRanges

`GET /_status/tenant_ranges`
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func TestBackupTenantImportingTable(t *testing.T) {
}
// Destroy the tenant, then restore it.
tSrv.AppStopper().Stop(ctx)
if _, err := sqlDB.DB.ExecContext(ctx, "ALTER TENANT [10] STOP SERVICE; DROP TENANT [10] IMMEDIATE"); err != nil {
if _, err := sqlDB.DB.ExecContext(ctx, "ALTER TENANT [10] STOP SERVICE"); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.DB.ExecContext(ctx, "DROP TENANT [10] IMMEDIATE"); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.DB.ExecContext(ctx, "RESTORE TENANT 10 FROM $1", dst); err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/backupccl/testdata/backup-restore/restore-tenants
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ SELECT crdb_internal.create_tenant(6);

# Drop one of them.
exec-sql
ALTER TENANT [5] STOP SERVICE; DROP TENANT [5]
ALTER TENANT [5] STOP SERVICE;
----

exec-sql
DROP TENANT [5]
----

exec-sql
Expand Down Expand Up @@ -139,7 +143,11 @@ SELECT id,name,data_state,service_mode,active,json_extract_path_text(crdb_intern

# Check that another service mode is also preserved.
exec-sql
ALTER TENANT newname STOP SERVICE; ALTER TENANT newname START SERVICE SHARED
ALTER TENANT newname STOP SERVICE;
----

exec-sql
ALTER TENANT newname START SERVICE SHARED;
----

query-sql
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/logictestccl/testdata/logic_test/tenant_usage
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ SELECT crdb_internal.update_tenant_resource_limits(5, 1000, 100, 0, now(), 0)
# Note this marks the tenant as dropped. The GC will not delete the tenant
# until after the ttl expires.
statement ok
ALTER TENANT [5] STOP SERVICE;
ALTER TENANT [5] STOP SERVICE

statement ok
DROP TENANT [5]

query error tenant "5" is not active
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func TestSharedProcessTenantNodeLocalAccess(t *testing.T) {
ctx := context.Background()
nodeCount := 3

skip.UnderDuress(t, "slow test")

dirs := make([]string, nodeCount)
dirCleanups := make([]func(), nodeCount)
for i := 0; i < nodeCount; i++ {
Expand Down Expand Up @@ -85,8 +87,8 @@ func TestSharedProcessTenantNodeLocalAccess(t *testing.T) {

db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `CREATE TENANT application;
ALTER TENANT application GRANT CAPABILITY can_use_nodelocal_storage;
ALTER TENANT application START SERVICE SHARED`)
ALTER TENANT application GRANT CAPABILITY can_use_nodelocal_storage`)
db.Exec(t, `ALTER TENANT application START SERVICE SHARED`)

var tenantID uint64
db.QueryRow(t, "SELECT id FROM [SHOW TENANT application]").Scan(&tenantID)
Expand Down Expand Up @@ -388,7 +390,7 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
skip.UnderDeadlock(t, "slow under deadlock")
skip.UnderDuress(t, "slow test")
t.Logf("starting test cluster")
numNodes := 3
tc := serverutils.StartCluster(t, numNodes, base.TestClusterArgs{
Expand All @@ -405,7 +407,9 @@ func TestServerControllerMultiNodeTenantStartup(t *testing.T) {

t.Logf("starting tenant servers")
db := tc.ServerConn(0)
_, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED")
_, err := db.Exec("CREATE TENANT hello")
require.NoError(t, err)
_, err = db.Exec("ALTER TENANT hello START SERVICE SHARED")
require.NoError(t, err)

// Pick a random node, try to run some SQL inside that tenant.
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/serverccl/shared_process_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func TestSharedProcessTenantNoSpanLimit(t *testing.T) {
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
_, err := db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED")
_, err := db.Exec("CREATE TENANT hello")
require.NoError(t, err)

_, err = db.Exec("ALTER TENANT hello START SERVICE SHARED")
require.NoError(t, err)

_, err = db.Exec("SET CLUSTER SETTING spanconfig.virtual_cluster.max_spans = 1000")
Expand Down
7 changes: 4 additions & 3 deletions pkg/configprofiles/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ var virtClusterInitTasks = []autoconfigpb.Task{
var virtClusterWithAppServiceInitTasks = append(
virtClusterInitTasks[:len(virtClusterInitTasks):len(virtClusterInitTasks)],
makeTask("create an application virtual cluster",
nil, /* nonTxnSQL */
/* txnSQL */ []string{
/* noTxnSQL */ []string{
// Create the app tenant record.
"CREATE VIRTUAL CLUSTER application",
"CREATE VIRTUAL CLUSTER IF NOT EXISTS application",
// Run the service for the application tenant.
"ALTER VIRTUAL CLUSTER application START SERVICE SHARED",
},
nil, /* txnSQL */

),
makeTask("activate application virtual cluster",
/* nonTxnSQL */ []string{
Expand Down
9 changes: 8 additions & 1 deletion pkg/multitenant/mtinfopb/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ const (
// This mode causes KV nodes to spontaneously start the SQL service
// for the tenant.
ServiceModeShared TenantServiceMode = 2

// ServiceModeStopping says that the service was previusly in
// ServiceModeShared but is in the process of stopping.
ServiceModeStopping TenantServiceMode = 3

// MaxServiceMode is a sentinel value.
MaxServiceMode TenantServiceMode = ServiceModeShared
MaxServiceMode TenantServiceMode = ServiceModeStopping
)

// String implements fmt.Stringer.
Expand All @@ -49,6 +54,8 @@ func (s TenantServiceMode) String() string {
return "external"
case ServiceModeShared:
return "shared"
case ServiceModeStopping:
return "stopping"
default:
return fmt.Sprintf("unimplemented-%d", int(s))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (a *Authorizer) HasCapabilityForBatch(
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
if entry.ServiceMode == mtinfopb.ServiceModeNone {
if entry.ServiceMode == mtinfopb.ServiceModeNone || entry.ServiceMode == mtinfopb.ServiceModeStopping {
return errors.Newf("operation not allowed when in service mode %q", entry.ServiceMode)
}
return a.capCheckForBatch(ctx, tenID, ba, entry)
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestServerController(t *testing.T) {
require.Nil(t, d)
require.Error(t, err, `no tenant found with name "somename"`)

_, err = db.Exec("CREATE TENANT hello; ALTER TENANT hello START SERVICE SHARED")
_, err = db.Exec("CREATE TENANT hello")
require.NoError(t, err)
_, err = db.Exec("ALTER TENANT hello START SERVICE SHARED")
require.NoError(t, err)

_, _, err = sc.getServer(ctx, "hello")
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ proto_library(
"//pkg/kv/kvserver/kvserverpb:kvserverpb_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_proto",
"//pkg/multitenant/mtinfopb:mtinfopb_proto",
"//pkg/roachpb:roachpb_proto",
"//pkg/server/diagnostics/diagnosticspb:diagnosticspb_proto",
"//pkg/server/status/statuspb:statuspb_proto",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_proto_library(
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/multitenant/mtinfopb",
"//pkg/roachpb",
"//pkg/server/diagnostics/diagnosticspb",
"//pkg/server/status/statuspb",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SQLStatusServer interface {
Logs(context.Context, *LogsRequest) (*LogEntriesResponse, error)
NodesUI(context.Context, *NodesRequest) (*NodesResponseExternal, error)
RequestJobProfilerExecutionDetails(context.Context, *RequestJobProfilerExecutionDetailsRequest) (*RequestJobProfilerExecutionDetailsResponse, error)
TenantServiceStatus(context.Context, *TenantServiceStatusRequest) (*TenantServiceStatusResponse, error)
}

// OptionalNodesStatusServer is a StatusServer that is only optionally present
Expand Down
29 changes: 29 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "build/info.proto";
import "errorspb/errors.proto";
import "gossip/gossip.proto";
import "jobs/jobspb/jobs.proto";
import "multitenant/mtinfopb/info.proto";
import "roachpb/data.proto";
import "roachpb/index_usage_stats.proto";
import "roachpb/span_config.proto";
Expand Down Expand Up @@ -613,6 +614,25 @@ message DownloadSpanResponse {
];
}

message TenantServiceStatusRequest {
string node_id = 1 [(gogoproto.customname) = "NodeID"];
uint64 tenant_id = 2 [(gogoproto.customname) = "TenantID"];
}

message TenantServiceStatusResponse {
map<int32, cockroach.multitenant.SQLInfo> status_by_node_id = 1 [
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID",
(gogoproto.customname) = "StatusByNodeID",
(gogoproto.nullable) = false
];

map<int32, string> errors_by_node_id = 2 [
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID",
(gogoproto.customname) = "ErrorsByNodeID",
(gogoproto.nullable) = false
];
}

message TraceEvent {
google.protobuf.Timestamp time = 1
[ (gogoproto.nullable) = false, (gogoproto.stdtime) = true ];
Expand Down Expand Up @@ -2198,6 +2218,15 @@ service Status {
};
}

// TenantServiceStatus returns the current service and data state of
// the given tenant as known to the server orchestrator, which may
// differ from the database state.
rpc TenantServiceStatus(TenantServiceStatusRequest) returns (TenantServiceStatusResponse) {
option (google.api.http) = {
get : "/_status/tenant_service_status"
};
}

// TenantRanges requests internal details about all range replicas within
// the tenant's keyspace at the time the request is processed.
rpc TenantRanges(TenantRangesRequest) returns (TenantRangesResponse) {
Expand Down
Loading

0 comments on commit e9c00be

Please sign in to comment.