diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 9c5baea50bb5..4f0b4e2dcd63 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -263,4 +263,9 @@ type TestTenantArgs struct { // ExternalIODirConfig is used to initialize the same-named // field on the server.Config struct. ExternalIODirConfig ExternalIODirConfig + + // If set, this will be appended to the Postgres URL by functions that + // automatically open a connection to the server. That's equivalent to running + // SET DATABASE=foo, which works even if the database doesn't (yet) exist. + UseDatabase string } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index fa0f6d311659..e92055285f85 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -215,15 +215,10 @@ func TestChangefeedTenants(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() - - kvServer, kvSQLdb, _ := serverutils.StartServer(t, base.TestServerArgs{ - ExternalIODirConfig: base.ExternalIODirConfig{ - DisableOutbound: true, - }, + kvServer, kvSQLdb, cleanup := startTestServer(t, func(args *base.TestServerArgs) { + args.ExternalIODirConfig.DisableOutbound = true }) - defer kvServer.Stopper().Stop(ctx) + defer cleanup() tenantArgs := base.TestTenantArgs{ // crdb_internal.create_tenant called by StartTenant @@ -234,27 +229,16 @@ func TestChangefeedTenants(t *testing.T) { ExternalIODirConfig: base.ExternalIODirConfig{ DisableOutbound: true, }, + UseDatabase: `d`, } tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs) tenantSQL := sqlutils.MakeSQLRunner(tenantDB) - // TODO(ssd): Cleanup this shared setup code once the refactor - // in #64693 is setttled. - tenantSQL.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - tenantSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) - tenantSQL.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) - - // Database `d` is hardcoded in a number of places. Create it - // and create a new connection to that database. - tenantSQL.Exec(t, `CREATE DATABASE d`) - tenantSQL = sqlutils.MakeSQLRunner( - serverutils.OpenDBConn(t, - tenantServer.SQLAddr(), `d`, false /* insecure */, kvServer.Stopper())) - tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`) + tenantSQL.Exec(t, serverSetupStatements) + tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`) t.Run("changefeed on non-tenant table fails", func(t *testing.T) { kvSQL := sqlutils.MakeSQLRunner(kvSQLdb) - kvSQL.Exec(t, `CREATE DATABASE d`) kvSQL.Exec(t, `CREATE TABLE d.foo (pk INT PRIMARY KEY)`) tenantSQL.ExpectErr(t, `table "foo" does not exist`, @@ -1035,7 +1019,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in fetchDescVersionModificationTime + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) log.Flush() @@ -1088,7 +1074,9 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in fetchDescVerionModifationTime + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) log.Flush() @@ -1209,7 +1197,7 @@ func TestChangefeedInterleaved(t *testing.T) { testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) - + sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true`) sqlDB.Exec(t, `CREATE TABLE grandparent (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO grandparent VALUES (0, 'grandparent-0')`) grandparent := feed(t, f, `CREATE CHANGEFEED FOR grandparent`) @@ -1413,7 +1401,10 @@ func TestChangefeedFailOnTableOffline(t *testing.T) { regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`)) }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of: + // changefeed_test.go:1409: error executing 'IMPORT INTO + // for_import CSV DATA ($1)': pq: fake protectedts.Provide + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) @@ -1448,7 +1439,13 @@ func TestChangefeedFailOnRBRChange(t *testing.T) { Value: testServerRegion, }) } - t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn)) + + // Tenants skiped because of: + // + // error executing 'ALTER DATABASE d PRIMARY REGION + // "us-east-1"': pq: get_live_cluster_regions: unimplemented: + // operation is unsupported in multi-tenancy mode + t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn)) t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn)) t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn)) @@ -1990,8 +1987,8 @@ func TestChangefeedMonitoring(t *testing.T) { return nil }) } - - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, func(t *testing.T) { skip.WithIssue(t, 38443) enterpriseTest(testFn) @@ -2241,8 +2238,9 @@ func TestChangefeedSchemaTTL(t *testing.T) { t.Errorf(`expected "GC threshold" error got: %+v`, err) } } - - t.Run("sinkless", sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in forceTableGC + t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants)) t.Run("enterprise", enterpriseTest(testFn)) t.Run("cloudstorage", cloudStorageTest(testFn)) t.Run("kafka", kafkaTest(testFn)) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index b597c6304f35..85c3ba1bbb60 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -13,6 +13,7 @@ import ( gosql "database/sql" gojson "encoding/json" "fmt" + "math/rand" "net/url" "reflect" "sort" @@ -245,6 +246,14 @@ func expectResolvedTimestampAvro( type cdcTestFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory) type updateArgsFn func(args *base.TestServerArgs) +var serverSetupStatements = ` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; +SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; +SET CLUSTER SETTING sql.defaults.vectorize=on; +CREATE DATABASE d; +` + func startTestServer( t testing.TB, argsFn updateArgsFn, ) (serverutils.TestServerInterface, *gosql.DB, func()) { @@ -276,22 +285,97 @@ func startTestServer( } }() - _, err = db.ExecContext(ctx, ` -SET CLUSTER SETTING kv.rangefeed.enabled = true; -SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; -SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; -SET CLUSTER SETTING sql.defaults.vectorize=on; -CREATE DATABASE d; -`) + _, err = db.ExecContext(ctx, serverSetupStatements) + require.NoError(t, err) if region := serverArgsRegion(args); region != "" { _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + require.NoError(t, err) } return s, db, cleanup } -func sinklessTestWithServerArgs( +type feedTestOptions struct { + noTenants bool +} + +type feedTestOption func(opts *feedTestOptions) + +// feedTestNoTenants is a feedTestOption that will prohibit this tests +// from randomly running on a tenant. +var feedTestNoTenants = func(opts *feedTestOptions) { opts.noTenants = true } + +// testServerShim is a kludge to get a few more tests working in +// tenant-mode. +// +// Currently, our TestFeedFactory has a Server() method that returns a +// TestServerInterface. The TestTenantInterface returned by +// StartTenant isn't a TestServerInterface. +// +// TODO(ssd): Clean this up. Perhaps we can add a SQLServer() method +// to TestFeedFactory that returns just the bits that are shared. +type testServerShim struct { + serverutils.TestServerInterface + sqlServer serverutils.TestTenantInterface +} + +func (t *testServerShim) ServingSQLAddr() string { + return t.sqlServer.SQLAddr() +} + +func sinklessTenantTestWithServerArgs( + argsFn func(args *base.TestServerArgs), testFn cdcTestFn, +) func(*testing.T) { + return func(t *testing.T) { + // We need to open a new log scope because StartTenant + // calls log.SetNodeIDs which can only be called once + // per log scope. If we don't open a log scope here, + // then any test function that wants to use this twice + // would fail. + defer log.Scope(t).Close(t) + ctx := context.Background() + kvServer, _, cleanup := startTestServer(t, func(args *base.TestServerArgs) { + args.ExternalIODirConfig.DisableOutbound = true + if argsFn != nil { + argsFn(args) + } + }) + defer cleanup() + + tenantID := serverutils.TestTenantID() + tenantArgs := base.TestTenantArgs{ + // crdb_internal.create_tenant called by StartTenant + TenantID: tenantID, + // Non-enterprise changefeeds are currently only + // disabled by setting DisableOutbound true + // everywhere. + ExternalIODirConfig: base.ExternalIODirConfig{ + DisableOutbound: true, + }, + UseDatabase: `d`, + } + + tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs) + + // Re-run setup on the tenant as well + _, err := tenantDB.ExecContext(ctx, serverSetupStatements) + require.NoError(t, err) + + sink, cleanup := sqlutils.PGUrl(t, tenantServer.SQLAddr(), t.Name(), url.User(security.RootUser)) + defer cleanup() + + server := &testServerShim{kvServer, tenantServer} + f := makeSinklessFeedFactory(server, sink) + + // Log so that it is clear if a failed test happened + // to run on a tenant. + t.Logf("Running sinkless test using tenant %s", tenantID) + testFn(t, tenantDB, f) + } +} + +func sinklessNoTenantTestWithServerArgs( argsFn func(args *base.TestServerArgs), testFn cdcTestFn, ) func(*testing.T) { return func(t *testing.T) { @@ -305,8 +389,25 @@ func sinklessTestWithServerArgs( } } -func sinklessTest(testFn cdcTestFn) func(*testing.T) { - return sinklessTestWithServerArgs(nil, testFn) +func sinklessTestWithServerArgs( + argsFn func(args *base.TestServerArgs), testFn cdcTestFn, testOpts ...feedTestOption, +) func(*testing.T) { + // percentTenant is the percentange of tests that will be run against + // a SQL-node in a multi-tenant server. 1 for all tests to be run on a + // tenant. + const percentTenant = 0.25 + options := &feedTestOptions{} + for _, o := range testOpts { + o(options) + } + if !options.noTenants && rand.Float32() < percentTenant { + return sinklessTenantTestWithServerArgs(argsFn, testFn) + } + return sinklessNoTenantTestWithServerArgs(argsFn, testFn) +} + +func sinklessTest(testFn cdcTestFn, testOpts ...feedTestOption) func(*testing.T) { + return sinklessTestWithServerArgs(nil, testFn, testOpts...) } func enterpriseTest(testFn cdcTestFn) func(*testing.T) { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 0fe47e36976c..f970013cc8bb 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -42,7 +42,12 @@ func TestChangefeedNemeses(t *testing.T) { t.Error(failure) } } - t.Run(`sinkless`, sinklessTest(testFn)) + // Tenant tests disabled because ALTER TABLE .. SPLIT is not + // support in multi-tenancy mode: + // + // nemeses_test.go:39: pq: unimplemented: operation is + // unsupported in multi-tenancy mode + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) log.Flush() diff --git a/pkg/ccl/changefeedccl/validations_test.go b/pkg/ccl/changefeedccl/validations_test.go index 720f061ebc35..b992f8c73210 100644 --- a/pkg/ccl/changefeedccl/validations_test.go +++ b/pkg/ccl/changefeedccl/validations_test.go @@ -98,7 +98,11 @@ func TestCatchupScanOrdering(t *testing.T) { } }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // Tenant tests skipped because of: + // validations_test.go:40: executing ALTER TABLE bank SPLIT AT + // VALUES (5): pq: unimplemented: operation is unsupported in + // multi-tenancy mode + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 016c6748354f..d377082336d2 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -351,7 +351,7 @@ func StartTenant( } goDB := OpenDBConn( - t, tenant.SQLAddr(), "", false /* insecure */, stopper) + t, tenant.SQLAddr(), params.UseDatabase, false /* insecure */, stopper) return tenant, goDB }