diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 029c690b212f..4dce95d5c50b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -133,6 +133,7 @@ /pkg/ccl/oidcccl/ @cockroachdb/sql-queries /pkg/ccl/partitionccl/ @cockroachdb/sql-schema @cockroachdb/multiregion /pkg/ccl/serverccl/ @cockroachdb/server-prs +/pkg/ccl/serverccl/statusccl @cockroachdb/sql-observability /pkg/ccl/telemetryccl/ @cockroachdb/obs-inf-prs /pkg/ccl/testccl/sqlccl/ @cockroachdb/sql-queries /pkg/ccl/testccl/workload/schemachange/ @cockroachdb/sql-schema diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index cd510f5184e6..9348f66031f1 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -30,6 +30,7 @@ ALL_TESTS = [ "//pkg/ccl/multitenantccl/tenantcostserver:tenantcostserver_test", "//pkg/ccl/oidcccl:oidcccl_test", "//pkg/ccl/partitionccl:partitionccl_test", + "//pkg/ccl/serverccl/statusccl:statusccl_test", "//pkg/ccl/serverccl:serverccl_test", "//pkg/ccl/sqlproxyccl/denylist:denylist_test", "//pkg/ccl/sqlproxyccl/idle:idle_test", diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index bba8971b9057..ef0ed35485d1 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -2,27 +2,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "serverccl", - srcs = [ - "doc.go", - "tenant_test_utils.go", - ], + srcs = ["doc.go"], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/serverccl", visibility = ["//visibility:public"], - deps = [ - "//pkg/base", - "//pkg/roachpb:with-mocks", - "//pkg/security", - "//pkg/server/serverpb", - "//pkg/sql/pgwire", - "//pkg/sql/sqlstats/persistedsqlstats", - "//pkg/sql/tests", - "//pkg/testutils/serverutils", - "//pkg/testutils/sqlutils", - "//pkg/util/httputil", - "//pkg/util/log", - "//pkg/util/protoutil", - "@com_github_stretchr_testify//require", - ], ) go_test( @@ -33,8 +15,6 @@ go_test( "main_test.go", "role_authentication_test.go", "server_sql_test.go", - "tenant_grpc_test.go", - "tenant_status_test.go", "tenant_vars_test.go", ], embed = [":serverccl"], @@ -45,20 +25,14 @@ go_test( "//pkg/ccl/utilccl", "//pkg/ccl/utilccl/licenseccl", "//pkg/roachpb:with-mocks", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", "//pkg/server/serverpb", "//pkg/sql", - "//pkg/sql/catalog/catconstants", - "//pkg/sql/catalog/descpb", - "//pkg/sql/idxusage", "//pkg/sql/pgwire/pgcode", - "//pkg/sql/sqlstats", "//pkg/sql/tests", "//pkg/testutils/serverutils", - "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", diff --git a/pkg/ccl/serverccl/statusccl/BUILD.bazel b/pkg/ccl/serverccl/statusccl/BUILD.bazel new file mode 100644 index 000000000000..8efd0f7b9dea --- /dev/null +++ b/pkg/ccl/serverccl/statusccl/BUILD.bazel @@ -0,0 +1,59 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "statusccl", + srcs = ["tenant_test_utils.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/serverccl/statusccl", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/server/serverpb", + "//pkg/sql/pgwire", + "//pkg/sql/sqlstats/persistedsqlstats", + "//pkg/sql/tests", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/util/httputil", + "//pkg/util/log", + "//pkg/util/protoutil", + "@com_github_stretchr_testify//require", + ], +) + +go_test( + name = "statusccl_test", + srcs = [ + "main_test.go", + "tenant_grpc_test.go", + "tenant_status_test.go", + ], + embed = [":statusccl"], + deps = [ + "//pkg/base", + "//pkg/ccl", + "//pkg/ccl/kvccl", + "//pkg/ccl/utilccl", + "//pkg/roachpb:with-mocks", + "//pkg/rpc", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/serverpb", + "//pkg/sql/catalog/catconstants", + "//pkg/sql/catalog/descpb", + "//pkg/sql/idxusage", + "//pkg/sql/sqlstats", + "//pkg/sql/tests", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/serverccl/statusccl/main_test.go b/pkg/ccl/serverccl/statusccl/main_test.go new file mode 100644 index 000000000000..6f7436229e1c --- /dev/null +++ b/pkg/ccl/serverccl/statusccl/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package statusccl + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/serverccl/tenant_grpc_test.go b/pkg/ccl/serverccl/statusccl/tenant_grpc_test.go similarity index 99% rename from pkg/ccl/serverccl/tenant_grpc_test.go rename to pkg/ccl/serverccl/statusccl/tenant_grpc_test.go index 6417932b08b2..ee2d54189eb1 100644 --- a/pkg/ccl/serverccl/tenant_grpc_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_grpc_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package serverccl +package statusccl import ( "context" diff --git a/pkg/ccl/serverccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go similarity index 66% rename from pkg/ccl/serverccl/tenant_status_test.go rename to pkg/ccl/serverccl/statusccl/tenant_status_test.go index 3aec40459f0c..cc0278799e02 100644 --- a/pkg/ccl/serverccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package serverccl +package statusccl import ( "context" @@ -14,7 +14,6 @@ import ( "encoding/hex" "fmt" "net/url" - "reflect" "sort" "strconv" "strings" @@ -40,6 +39,49 @@ import ( "github.com/stretchr/testify/require" ) +func TestTenantStatusAPI(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The liveness session might expire before the stress race can finish. + skip.UnderStressRace(t, "expensive tests") + + ctx := context.Background() + + statsIngestionCb, statsIngestionNotifier := idxusage.CreateIndexStatsIngestedCallbackForTest() + knobs := tests.CreateTestingKnobs() + knobs.IndexUsageStatsKnobs = &idxusage.TestingKnobs{ + OnIndexUsageStatsProcessedCallback: statsIngestionCb, + } + + testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, knobs) + defer testHelper.cleanup(ctx, t) + + t.Run("reset_sql_stats", func(t *testing.T) { + testResetSQLStatsRPCForTenant(ctx, t, testHelper) + }) + + t.Run("reset_index_usage_stats", func(t *testing.T) { + testResetIndexUsageStatsRPCForTenant(ctx, t, testHelper, statsIngestionNotifier) + }) + + t.Run("tenant_contention_event", func(t *testing.T) { + testContentionEventsForTenant(ctx, t, testHelper) + }) + + t.Run("tenant_cancel_session", func(t *testing.T) { + testTenantStatusCancelSession(t, testHelper) + }) + + t.Run("tenant_cancel_query", func(t *testing.T) { + testTenantStatusCancelQuery(ctx, t, testHelper) + }) + + t.Run("index_usage_stats", func(t *testing.T) { + testIndexUsageForTenants(t, testHelper, statsIngestionNotifier) + }) +} + func TestTenantCannotSeeNonTenantStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -206,23 +248,15 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) { }) } -func TestResetSQLStatsRPCForTenant(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - +func testResetSQLStatsRPCForTenant( + ctx context.Context, t *testing.T, testHelper *tenantTestHelper, +) { stmts := []string{ "SELECT 1", "SELECT 1, 1", "SELECT 1, 1, 1", } - testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, tests.CreateTestingKnobs()) - defer testHelper.cleanup(ctx, t) - testCluster := testHelper.testCluster() controlCluster := testHelper.controlCluster() @@ -232,25 +266,34 @@ func TestResetSQLStatsRPCForTenant(t *testing.T) { controlCluster.tenantConn(0 /* idx */). Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false") + defer func() { + // Cleanup + testCluster.tenantConn(0 /* idx */). + Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true") + controlCluster.tenantConn(0 /* idx */). + Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true") + + }() + for _, flushed := range []bool{false, true} { t.Run(fmt.Sprintf("flushed=%t", flushed), func(t *testing.T) { // Clears the SQL Stats at the end of each test via builtin. defer func() { - testCluster.tenantConn(0 /* idx */).Exec(t, "SELECT crdb_internal.reset_sql_stats()") - controlCluster.tenantConn(0 /* idx */).Exec(t, "SELECT crdb_internal.reset_sql_stats()") + testCluster.tenantConn(randomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()") + controlCluster.tenantConn(randomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()") }() for _, stmt := range stmts { - testCluster.tenantConn(0 /* idx */).Exec(t, stmt) - controlCluster.tenantConn(0 /* idx */).Exec(t, stmt) + testCluster.tenantConn(randomServer).Exec(t, stmt) + controlCluster.tenantConn(randomServer).Exec(t, stmt) } if flushed { - testCluster.tenantSQLStats(0 /* idx */).Flush(ctx) - controlCluster.tenantSQLStats(0 /* idx */).Flush(ctx) + testCluster.tenantSQLStats(randomServer).Flush(ctx) + controlCluster.tenantSQLStats(randomServer).Flush(ctx) } - status := testCluster.tenantStatusSrv(1 /* idx */) + status := testCluster.tenantStatusSrv(randomServer) statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{ Combined: true, @@ -291,7 +334,7 @@ func TestResetSQLStatsRPCForTenant(t *testing.T) { // Ensures that sql stats reset is isolated by tenant boundary. statsFromControlCluster, err := - controlCluster.tenantStatusSrv(1 /* idx */).Statements(ctx, &serverpb.StatementsRequest{ + controlCluster.tenantStatusSrv(randomServer).Statements(ctx, &serverpb.StatementsRequest{ Combined: true, }) require.NoError(t, err) @@ -301,31 +344,44 @@ func TestResetSQLStatsRPCForTenant(t *testing.T) { } } -func TestResetIndexUsageStatsRPCForTenant(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - - statsIngestionCb, statsIngestionNotifier := idxusage.CreateIndexStatsIngestedCallbackForTest() - - knobs := tests.CreateTestingKnobs() - knobs.IndexUsageStatsKnobs = &idxusage.TestingKnobs{ - OnIndexUsageStatsProcessedCallback: statsIngestionCb, +func testResetIndexUsageStatsRPCForTenant( + ctx context.Context, + t *testing.T, + testHelper *tenantTestHelper, + ingestedNotifier chan roachpb.IndexUsageKey, +) { + testCases := []struct { + name string + resetFn func(helper *tenantTestHelper) + }{ + { + name: "sql-cli", + resetFn: func(helper *tenantTestHelper) { + // Reset index usage stats using SQL shell built-in. + testingCluster := helper.testCluster() + testingCluster.tenantConn(0).Exec(t, "SELECT crdb_internal.reset_index_usage_stats()") + }, + }, + { + name: "http", + resetFn: func(helper *tenantTestHelper) { + // Reset index usage stats over HTTP on tenant SQL pod 1. + httpPod1 := helper.testCluster().tenantHTTPClient(t, 1) + defer httpPod1.Close() + httpPod1.PostJSON("/_status/resetindexusagestats", &serverpb.ResetIndexUsageStatsRequest{}, &serverpb.ResetIndexUsageStatsResponse{}) + }, + }, } - testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, knobs) - defer testHelper.cleanup(ctx, t) - - testingCluster := testHelper.testCluster() - controlCluster := testHelper.controlCluster() - var testingTableID, controlTableID string + for _, testCase := range testCases { + testingCluster := testHelper.testCluster() + controlCluster := testHelper.controlCluster() - for i, cluster := range []tenantCluster{testingCluster, controlCluster} { - // Create tables and insert data. - cluster.tenantConn(0).Exec(t, ` + t.Run(testCase.name, func(t *testing.T) { + var testingTableID, controlTableID string + for i, cluster := range []tenantCluster{testingCluster, controlCluster} { + // Create tables and insert data. + cluster.tenantConn(0).Exec(t, ` CREATE TABLE test ( k INT PRIMARY KEY, a INT, @@ -334,42 +390,45 @@ CREATE TABLE test ( ) `) - cluster.tenantConn(0).Exec(t, ` + cluster.tenantConn(0).Exec(t, ` INSERT INTO test VALUES (1, 10, 100), (2, 20, 200), (3, 30, 300) `) - // Record scan on primary index. - cluster.tenantConn(0).Exec(t, "SELECT * FROM test") - - // Record scan on secondary index. - cluster.tenantConn(1).Exec(t, "SELECT * FROM test@test_a_idx") - testTableIDStr := cluster.tenantConn(2).QueryStr(t, "SELECT 'test'::regclass::oid")[0][0] - testTableID, err := strconv.Atoi(testTableIDStr) - require.NoError(t, err) - - // Set table ID outside of loop. - if i == 0 { - testingTableID = testTableIDStr - } else { - controlTableID = testTableIDStr - } + // Record scan on primary index. + cluster.tenantConn(randomServer). + Exec(t, "SELECT * FROM test") + + // Record scan on secondary index. + cluster.tenantConn(randomServer). + Exec(t, "SELECT * FROM test@test_a_idx") + testTableIDStr := cluster.tenantConn(randomServer). + QueryStr(t, "SELECT 'test'::regclass::oid")[0][0] + testTableID, err := strconv.Atoi(testTableIDStr) + require.NoError(t, err) + + // Set table ID outside of loop. + if i == 0 { + testingTableID = testTableIDStr + } else { + controlTableID = testTableIDStr + } - // Wait for the stats to be ingested. - require.NoError(t, - idxusage.WaitForIndexStatsIngestionForTest(statsIngestionNotifier, map[roachpb.IndexUsageKey]struct{}{ - { - TableID: roachpb.TableID(testTableID), - IndexID: 1, - }: {}, - { - TableID: roachpb.TableID(testTableID), - IndexID: 2, - }: {}, - }, 2 /* expectedEventCnt*/, 5*time.Second /* timeout */), - ) - - query := ` + // Wait for the stats to be ingested. + require.NoError(t, + idxusage.WaitForIndexStatsIngestionForTest(ingestedNotifier, map[roachpb.IndexUsageKey]struct{}{ + { + TableID: roachpb.TableID(testTableID), + IndexID: 1, + }: {}, + { + TableID: roachpb.TableID(testTableID), + IndexID: 2, + }: {}, + }, 2 /* expectedEventCnt*/, 5*time.Second /* timeout */), + ) + + query := ` SELECT table_id, index_id, @@ -379,35 +438,36 @@ FROM crdb_internal.index_usage_statistics WHERE table_id = ` + testTableIDStr - // Assert index usage data was inserted. - expected := [][]string{ - {testTableIDStr, "1", "1", "true"}, - {testTableIDStr, "2", "1", "true"}, - } - cluster.tenantConn(2).CheckQueryResults(t, query, expected) - } + // Assert index usage data was inserted. + expected := [][]string{ + {testTableIDStr, "1", "1", "true"}, + {testTableIDStr, "2", "1", "true"}, + } + cluster.tenantConn(randomServer).CheckQueryResults(t, query, expected) + } - // Reset index usage stats. - timePreReset := timeutil.Now() - status := testingCluster.tenantStatusSrv(1 /* idx */) - _, err := status.ResetIndexUsageStats(ctx, &serverpb.ResetIndexUsageStatsRequest{}) - require.NoError(t, err) + // Reset index usage stats. + timePreReset := timeutil.Now() + status := testingCluster.tenantStatusSrv(randomServer) - // Check that last reset time was updated for test cluster. - resp, err := status.IndexUsageStatistics(ctx, &serverpb.IndexUsageStatisticsRequest{}) - require.NoError(t, err) - require.True(t, resp.LastReset.After(timePreReset)) + // Reset index usage stats. + testCase.resetFn(testHelper) - // Ensure tenant data isolation. - // Check that last reset time was not updated for control cluster. - status = controlCluster.tenantStatusSrv(1 /* idx */) - resp, err = status.IndexUsageStatistics(ctx, &serverpb.IndexUsageStatisticsRequest{}) - require.NoError(t, err) - require.Equal(t, resp.LastReset, time.Time{}) + // Check that last reset time was updated for test cluster. + resp, err := status.IndexUsageStatistics(ctx, &serverpb.IndexUsageStatisticsRequest{}) + require.NoError(t, err) + require.True(t, resp.LastReset.After(timePreReset)) - // Query to fetch index usage stats. We do this instead of sending - // an RPC request so that we can filter by table id. - query := ` + // Ensure tenant data isolation. + // Check that last reset time was not updated for control cluster. + status = controlCluster.tenantStatusSrv(randomServer) + resp, err = status.IndexUsageStatistics(ctx, &serverpb.IndexUsageStatisticsRequest{}) + require.NoError(t, err) + require.Equal(t, resp.LastReset, time.Time{}) + + // Query to fetch index usage stats. We do this instead of sending + // an RPC request so that we can filter by table id. + query := ` SELECT table_id, total_reads, @@ -418,22 +478,28 @@ WHERE table_id = $1 ` - // Check that index usage stats were reset. - rows := testingCluster.tenantConn(2).QueryStr(t, query, testingTableID) - require.NotNil(t, rows) - for _, row := range rows { - require.Equal(t, row[1], "0", "expected total reads for table %s to be reset, but got %s", - row[0], row[1]) - require.Equal(t, row[2], "NULL", "expected last read time for table %s to be reset, but got %s", - row[0], row[2]) - } + // Check that index usage stats were reset. + rows := testingCluster.tenantConn(2).QueryStr(t, query, testingTableID) + require.NotNil(t, rows) + for _, row := range rows { + require.Equal(t, row[1], "0", "expected total reads for table %s to be reset, but got %s", + row[0], row[1]) + require.Equal(t, row[2], "NULL", "expected last read time for table %s to be reset, but got %s", + row[0], row[2]) + } - // Ensure tenant data isolation. - rows = controlCluster.tenantConn(2).QueryStr(t, query, controlTableID) - require.NotNil(t, rows) - for _, row := range rows { - require.NotEqual(t, row[1], "0", "expected total reads for table %s to not be reset, but got %s", row[0], row[1]) - require.NotEqual(t, row[2], "NULL", "expected last read time for table %s to not be reset, but got %s", row[0], row[2]) + // Ensure tenant data isolation. + rows = controlCluster.tenantConn(0).QueryStr(t, query, controlTableID) + require.NotNil(t, rows) + for _, row := range rows { + require.NotEqual(t, row[1], "0", "expected total reads for table %s to not be reset, but got %s", row[0], row[1]) + require.NotEqual(t, row[2], "NULL", "expected last read time for table %s to not be reset, but got %s", row[0], row[2]) + } + + // Cleanup. + testingCluster.tenantConn(0).Exec(t, "DROP TABLE IF EXISTS test") + controlCluster.tenantConn(0).Exec(t, "DROP TABLE IF EXISTS test") + }) } } @@ -458,18 +524,9 @@ func ensureExpectedStmtFingerprintExistsInRPCResponse( } } -func TestContentionEventsForTenant(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - - testHelper := - newTestTenantHelper(t, 3 /* tenantClusterSize */, tests.CreateTestingKnobs()) - defer testHelper.cleanup(ctx, t) - +func testContentionEventsForTenant( + ctx context.Context, t *testing.T, testHelper *tenantTestHelper, +) { testingCluster := testHelper.testCluster() controlledCluster := testHelper.controlCluster() @@ -533,28 +590,19 @@ SET TRACING=off; } } -func TestIndexUsageForTenants(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - - statsIngestionCb, statsIngestionNotifier := idxusage.CreateIndexStatsIngestedCallbackForTest() - - knobs := tests.CreateTestingKnobs() - knobs.IndexUsageStatsKnobs = &idxusage.TestingKnobs{ - OnIndexUsageStatsProcessedCallback: statsIngestionCb, - } - testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, knobs) - defer testHelper.cleanup(ctx, t) - +func testIndexUsageForTenants( + t *testing.T, testHelper *tenantTestHelper, ingestNotifier chan roachpb.IndexUsageKey, +) { testingCluster := testHelper.testCluster() controlledCluster := testHelper.controlCluster() + testingCluster.tenantConn(0).Exec(t, "USE defaultdb") + testingCluster.tenantConn(1).Exec(t, "USE defaultdb") + testingCluster.tenantConn(2).Exec(t, "USE defaultdb") + testingCluster.tenantConn(0).Exec(t, `CREATE SCHEMA idx_test`) + testingCluster.tenantConn(0).Exec(t, ` -CREATE TABLE test ( +CREATE TABLE idx_test.test ( k INT PRIMARY KEY, a INT, b INT, @@ -562,23 +610,27 @@ CREATE TABLE test ( ) `) + defer func() { + testingCluster.tenantConn(0).Exec(t, "DROP TABLE idx_test.test") + }() + testingCluster.tenantConn(0).Exec(t, ` -INSERT INTO test +INSERT INTO idx_test.test VALUES (1, 10, 100), (2, 20, 200), (3, 30, 300) `) // Record scan on primary index. - testingCluster.tenantConn(0).Exec(t, "SELECT * FROM test") + testingCluster.tenantConn(0).Exec(t, "SELECT * FROM idx_test.test") // Record scan on secondary index. - testingCluster.tenantConn(1).Exec(t, "SELECT * FROM test@test_a_idx") - testTableIDStr := testingCluster.tenantConn(2).QueryStr(t, "SELECT 'test'::regclass::oid")[0][0] + testingCluster.tenantConn(1).Exec(t, "SELECT * FROM idx_test.test@test_a_idx") + testTableIDStr := testingCluster.tenantConn(2).QueryStr(t, "SELECT 'idx_test.test'::regclass::oid")[0][0] testTableID, err := strconv.Atoi(testTableIDStr) require.NoError(t, err) // Wait for the stats to be ingested. require.NoError(t, - idxusage.WaitForIndexStatsIngestionForTest(statsIngestionNotifier, map[roachpb.IndexUsageKey]struct{}{ + idxusage.WaitForIndexStatsIngestionForTest(ingestNotifier, map[roachpb.IndexUsageKey]struct{}{ { TableID: roachpb.TableID(testTableID), IndexID: 1, @@ -610,7 +662,7 @@ WHERE require.Equal(t, expected, actual) // Ensure tenant data isolation. - actual = controlledCluster.tenantConn(2).QueryStr(t, query, testTableID) + actual = controlledCluster.tenantConn(0).QueryStr(t, query, testTableID) expected = [][]string{} require.Equal(t, expected, actual) @@ -625,27 +677,16 @@ func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { return sessionIDs } -func TestTenantStatusCancelSession(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - helper := newTestTenantHelper(t, 3, base.TestingKnobs{}) - defer helper.cleanup(ctx, t) - +func testTenantStatusCancelSession(t *testing.T, helper *tenantTestHelper) { // Open a SQL session on tenant SQL pod 0. sqlPod0 := helper.testCluster().tenantConn(0) sqlPod0.Exec(t, "SELECT 1") // See the session over HTTP on tenant SQL pod 1. - httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1) - require.NoError(t, err) + httpPod1 := helper.testCluster().tenantHTTPClient(t, 1) defer httpPod1.Close() listSessionsResp := serverpb.ListSessionsResponse{} - err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp) - require.NoError(t, err) + httpPod1.GetJSON("/_status/sessions", &listSessionsResp) var session serverpb.Session for _, s := range listSessionsResp.Sessions { if s.LastActiveQuery == "SELECT 1" { @@ -656,24 +697,27 @@ func TestTenantStatusCancelSession(t *testing.T) { require.NotNil(t, session.ID, "session not found") // See the session over SQL on tenant SQL pod 0. - require.Contains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID)) + sessionID := hex.EncodeToString(session.ID) + require.Eventually(t, func() bool { + return strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + }, 5*time.Second, 100*time.Millisecond) // Cancel the session over HTTP from tenant SQL pod 1. cancelSessionReq := serverpb.CancelSessionRequest{SessionID: session.ID} cancelSessionResp := serverpb.CancelSessionResponse{} - err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) require.Equal(t, true, cancelSessionResp.Canceled, cancelSessionResp.Error) // No longer see the session over SQL from tenant SQL pod 0. // (The SQL client maintains an internal connection pool and automatically reconnects.) - require.NotContains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID)) + require.Eventually(t, func() bool { + return !strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + }, 5*time.Second, 100*time.Millisecond) // Attempt to cancel the session again over HTTP from tenant SQL pod 1, so that we can see the error message. - err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) require.Equal(t, false, cancelSessionResp.Canceled) - require.Equal(t, fmt.Sprintf("session ID %s not found", hex.EncodeToString(session.ID)), cancelSessionResp.Error) + require.Equal(t, fmt.Sprintf("session ID %s not found", sessionID), cancelSessionResp.Error) } func selectClusterQueryIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { @@ -685,39 +729,28 @@ func selectClusterQueryIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { return queryIDs } -func TestTenantStatusCancelQuery(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderStressRace(t, "expensive tests") - - ctx := context.Background() - helper := newTestTenantHelper(t, 3, base.TestingKnobs{}) - defer helper.cleanup(ctx, t) - +func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tenantTestHelper) { // Open a SQL session on tenant SQL pod 0 and start a long-running query. sqlPod0 := helper.testCluster().tenantConn(0) - results := make(chan struct{}) - errors := make(chan error) - defer close(results) - defer close(errors) + resultCh := make(chan struct{}) + errorCh := make(chan error) + defer close(resultCh) + defer close(errorCh) go func() { if _, err := sqlPod0.DB.ExecContext(ctx, "SELECT pg_sleep(60)"); err != nil { - errors <- err + errorCh <- err } else { - results <- struct{}{} + resultCh <- struct{}{} } }() // See the query over HTTP on tenant SQL pod 1. - httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1) - require.NoError(t, err) + httpPod1 := helper.testCluster().tenantHTTPClient(t, 1) defer httpPod1.Close() var listSessionsResp serverpb.ListSessionsResponse var query serverpb.ActiveQuery require.Eventually(t, func() bool { - err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp) - require.NoError(t, err) + httpPod1.GetJSON("/_status/sessions", &listSessionsResp) for _, s := range listSessionsResp.Sessions { for _, q := range s.ActiveQueries { if q.Sql == "SELECT pg_sleep(60)" { @@ -730,32 +763,32 @@ func TestTenantStatusCancelQuery(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "query not found") // See the query over SQL on tenant SQL pod 0. - require.Contains(t, selectClusterQueryIDs(t, sqlPod0), query.ID) + require.Eventually(t, func() bool { + return strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID) + }, 10*time.Second, 100*time.Millisecond) // Cancel the query over HTTP on tenant SQL pod 1. cancelQueryReq := serverpb.CancelQueryRequest{QueryID: query.ID} cancelQueryResp := serverpb.CancelQueryResponse{} - err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) require.Equal(t, true, cancelQueryResp.Canceled, "expected query to be canceled, but encountered unexpected error %s", cancelQueryResp.Error) // No longer see the query over SQL on tenant SQL pod 0. require.Eventually(t, func() bool { - return !strings.Contains(reflect.ValueOf(selectClusterQueryIDs(t, sqlPod0)).String(), query.ID) + return !strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID) }, 10*time.Second, 100*time.Millisecond, "expected query %s to no longer be visible in crdb_internal.cluster_queries", query.ID) select { - case <-results: + case <-resultCh: t.Fatalf("Expected long-running query to have been canceled with error.") - case err := <-errors: + case err := <-errorCh: require.Equal(t, "pq: query execution canceled", err.Error()) } // Attempt to cancel the query again over HTTP from tenant SQL pod 1, so that we can see the error message. - err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) require.Equal(t, false, cancelQueryResp.Canceled) require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error) } diff --git a/pkg/ccl/serverccl/tenant_test_utils.go b/pkg/ccl/serverccl/statusccl/tenant_test_utils.go similarity index 70% rename from pkg/ccl/serverccl/tenant_test_utils.go rename to pkg/ccl/serverccl/statusccl/tenant_test_utils.go index ba882aee8041..871bde2c215f 100644 --- a/pkg/ccl/serverccl/tenant_test_utils.go +++ b/pkg/ccl/serverccl/statusccl/tenant_test_utils.go @@ -6,11 +6,12 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package serverccl +package statusccl import ( "context" gosql "database/sql" + "math/rand" "net/http" "testing" @@ -29,6 +30,13 @@ import ( "github.com/stretchr/testify/require" ) +// serverIdx is the index of the node within a test cluster. A special value +// `randomServer` can be used to let the test helper to randomly choose to +// a server from the test cluster. +type serverIdx int + +const randomServer serverIdx = -1 + type testTenant struct { tenant serverutils.TestTenantInterface tenantConn *gosql.DB @@ -85,7 +93,7 @@ func newTestTenantHelper( t.Helper() params, _ := tests.CreateTestServerParams() - testCluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{ + testCluster := serverutils.StartNewTestCluster(t, 1 /* numNodes */, base.TestClusterArgs{ ServerArgs: params, }) server := testCluster.Server(0) @@ -99,10 +107,12 @@ func newTestTenantHelper( security.EmbeddedTenantIDs()[0], knobs, ), + // Spin up a small tenant cluster under a different tenant ID to test + // tenant isolation. tenantControlCluster: newTenantCluster( t, server, - tenantClusterSize, + 1, /* tenantClusterSize */ security.EmbeddedTenantIDs()[1], knobs, ), @@ -146,24 +156,22 @@ func newTenantCluster( return cluster } -func (c tenantCluster) tenantConn(idx int) *sqlutils.SQLRunner { - return c[idx].tenantDB +func (c tenantCluster) tenantConn(idx serverIdx) *sqlutils.SQLRunner { + return c.tenant(idx).tenantDB } -func (c tenantCluster) tenantHTTPJSONClient(idx int) (*httpJSONClient, error) { - client, err := c[idx].tenant.RPCContext().GetHTTPClient() - if err != nil { - return nil, err - } - return &httpJSONClient{client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()}, nil +func (c tenantCluster) tenantHTTPClient(t *testing.T, idx serverIdx) *httpClient { + client, err := c.tenant(idx).tenant.RPCContext().GetHTTPClient() + require.NoError(t, err) + return &httpClient{t: t, client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()} } -func (c tenantCluster) tenantSQLStats(idx int) *persistedsqlstats.PersistedSQLStats { - return c[idx].tenantSQLStats +func (c tenantCluster) tenantSQLStats(idx serverIdx) *persistedsqlstats.PersistedSQLStats { + return c.tenant(idx).tenantSQLStats } -func (c tenantCluster) tenantStatusSrv(idx int) serverpb.SQLStatusServer { - return c[idx].tenantStatus +func (c tenantCluster) tenantStatusSrv(idx serverIdx) serverpb.SQLStatusServer { + return c.tenant(idx).tenantStatus } func (c tenantCluster) cleanup(t *testing.T) { @@ -172,21 +180,32 @@ func (c tenantCluster) cleanup(t *testing.T) { } } -type httpJSONClient struct { +// tenant selects a tenant node from the tenant cluster. If randomServer +// is passed in, then a random node is selected. +func (c tenantCluster) tenant(idx serverIdx) *testTenant { + if idx == randomServer { + return c[rand.Intn(len(c))] + } + + return c[idx] +} + +type httpClient struct { + t *testing.T client http.Client baseURL string } -func (c *httpJSONClient) GetJSON(path string, response protoutil.Message) error { - return httputil.GetJSON(c.client, c.baseURL+path, response) +func (c *httpClient) GetJSON(path string, response protoutil.Message) { + err := httputil.GetJSON(c.client, c.baseURL+path, response) + require.NoError(c.t, err) } -func (c *httpJSONClient) PostJSON( - path string, request protoutil.Message, response protoutil.Message, -) error { - return httputil.PostJSON(c.client, c.baseURL+path, request, response) +func (c *httpClient) PostJSON(path string, request protoutil.Message, response protoutil.Message) { + err := httputil.PostJSON(c.client, c.baseURL+path, request, response) + require.NoError(c.t, err) } -func (c *httpJSONClient) Close() { +func (c *httpClient) Close() { c.client.CloseIdleConnections() }