From 71c977b36b01a9c7ca8cf8871b5fc645bb855b73 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 12 May 2021 13:33:52 +0100 Subject: [PATCH] changefeedccl: randomly run sinkless tests on tenant Now, sinkless tests will run against a tenant's SQL server 25% of the time, unless they opt-out via an option. The main two reasons for tests needing to opt-out of running on tenants are 1) Unsupported SQL statements in mutli-tenant mode (such as REGION related statements or SPLIT) 2) Needing complex interactions with the TestServerInterface. All such skips are currently marked with a TODO. Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 32 ++++-- pkg/ccl/changefeedccl/helpers_test.go | 124 ++++++++++++++++++++-- pkg/ccl/changefeedccl/nemeses_test.go | 7 +- pkg/ccl/changefeedccl/validations_test.go | 6 +- 4 files changed, 148 insertions(+), 21 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index fa0f6d311659..fd9cd271b506 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1035,7 +1035,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in fetchDescVersionModificationTime + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) log.Flush() @@ -1088,7 +1090,9 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in fetchDescVerionModifationTime + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) log.Flush() @@ -1209,7 +1213,7 @@ func TestChangefeedInterleaved(t *testing.T) { testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) - + sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true`) sqlDB.Exec(t, `CREATE TABLE grandparent (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO grandparent VALUES (0, 'grandparent-0')`) grandparent := feed(t, f, `CREATE CHANGEFEED FOR grandparent`) @@ -1413,7 +1417,10 @@ func TestChangefeedFailOnTableOffline(t *testing.T) { regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`)) }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of: + // changefeed_test.go:1409: error executing 'IMPORT INTO + // for_import CSV DATA ($1)': pq: fake protectedts.Provide + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) @@ -1448,7 +1455,13 @@ func TestChangefeedFailOnRBRChange(t *testing.T) { Value: testServerRegion, }) } - t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn)) + + // Tenants skiped because of: + // + // error executing 'ALTER DATABASE d PRIMARY REGION + // "us-east-1"': pq: get_live_cluster_regions: unimplemented: + // operation is unsupported in multi-tenancy mode + t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn)) t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn)) t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn)) @@ -1990,8 +2003,8 @@ func TestChangefeedMonitoring(t *testing.T) { return nil }) } - - t.Run(`sinkless`, sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, func(t *testing.T) { skip.WithIssue(t, 38443) enterpriseTest(testFn) @@ -2241,8 +2254,9 @@ func TestChangefeedSchemaTTL(t *testing.T) { t.Errorf(`expected "GC threshold" error got: %+v`, err) } } - - t.Run("sinkless", sinklessTest(testFn)) + // TODO(ssd): tenant tests skipped because of f.Server() use + // in forceTableGC + t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants)) t.Run("enterprise", enterpriseTest(testFn)) t.Run("cloudstorage", cloudStorageTest(testFn)) t.Run("kafka", kafkaTest(testFn)) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index b597c6304f35..3db7169f036a 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -13,6 +13,7 @@ import ( gosql "database/sql" gojson "encoding/json" "fmt" + "math/rand" "net/url" "reflect" "sort" @@ -245,6 +246,14 @@ func expectResolvedTimestampAvro( type cdcTestFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory) type updateArgsFn func(args *base.TestServerArgs) +var serverSetupStatements = ` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; +SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; +SET CLUSTER SETTING sql.defaults.vectorize=on; +CREATE DATABASE d; +` + func startTestServer( t testing.TB, argsFn updateArgsFn, ) (serverutils.TestServerInterface, *gosql.DB, func()) { @@ -276,22 +285,100 @@ func startTestServer( } }() - _, err = db.ExecContext(ctx, ` -SET CLUSTER SETTING kv.rangefeed.enabled = true; -SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; -SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; -SET CLUSTER SETTING sql.defaults.vectorize=on; -CREATE DATABASE d; -`) + _, err = db.ExecContext(ctx, serverSetupStatements) + require.NoError(t, err) if region := serverArgsRegion(args); region != "" { _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + require.NoError(t, err) } return s, db, cleanup } -func sinklessTestWithServerArgs( +type feedTestOptions struct { + noTenants bool +} + +type feedTestOption func(opts *feedTestOptions) + +// feedTestNoTenants is a feedTestOption that will prohibit this tests +// from randomly running on a tenant. +var feedTestNoTenants = func(opts *feedTestOptions) { opts.noTenants = true } + +// testServerShim is a kludge to get a few more tests working in +// tenant-mode. +// +// Currently, our TestFeedFactory has a Server() method that returns a +// TestServerInterface. The TestTenantInterface returned by +// StartTenant isn't a TestServerInterface. +// +// TODO(ssd): Clean this up. Perhaps we can add a SQLServer() method +// to TestFeedFactory that returns just the bits that are shared. +type testServerShim struct { + serverutils.TestServerInterface + sqlServer serverutils.TestTenantInterface +} + +func (t *testServerShim) ServingSQLAddr() string { + return t.sqlServer.SQLAddr() +} + +func sinklessTenantTestWithServerArgs( + argsFn func(args *base.TestServerArgs), testFn cdcTestFn, +) func(*testing.T) { + return func(t *testing.T) { + // We need to open a new log scope because StartTenant + // calls log.SetNodeIDs which can only be called once + // per log scope. If we don't open a log scope here, + // then any test function that wants to use this twice + // would fail. + defer log.Scope(t).Close(t) + ctx := context.Background() + kvServer, _, cleanup := startTestServer(t, func(args *base.TestServerArgs) { + args.ExternalIODirConfig.DisableOutbound = true + if argsFn != nil { + argsFn(args) + } + }) + defer cleanup() + + tenantID := serverutils.TestTenantID() + tenantArgs := base.TestTenantArgs{ + // crdb_internal.create_tenant called by StartTenant + TenantID: tenantID, + // Non-enterprise changefeeds are currently only + // disabled by setting DisableOutbound true + // everywhere. + ExternalIODirConfig: base.ExternalIODirConfig{ + DisableOutbound: true, + }, + } + + tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs) + + // Re-run setup on the tenant as well + _, err := tenantDB.ExecContext(ctx, serverSetupStatements) + require.NoError(t, err) + + // Database `d` is hardcoded in a number of + // places. Create a new connection to that database. + tenantDB = serverutils.OpenDBConn(t, tenantServer.SQLAddr(), `d`, false /* insecure */, kvServer.Stopper()) + + sink, cleanup := sqlutils.PGUrl(t, tenantServer.SQLAddr(), t.Name(), url.User(security.RootUser)) + defer cleanup() + + server := &testServerShim{kvServer, tenantServer} + f := makeSinklessFeedFactory(server, sink) + + // Log so that it is clear if a failed test happened + // to run on a tenant. + t.Logf("Running sinkless test using tenant %s", tenantID) + testFn(t, tenantDB, f) + } +} + +func sinklessNoTenantTestWithServerArgs( argsFn func(args *base.TestServerArgs), testFn cdcTestFn, ) func(*testing.T) { return func(t *testing.T) { @@ -305,8 +392,25 @@ func sinklessTestWithServerArgs( } } -func sinklessTest(testFn cdcTestFn) func(*testing.T) { - return sinklessTestWithServerArgs(nil, testFn) +func sinklessTestWithServerArgs( + argsFn func(args *base.TestServerArgs), testFn cdcTestFn, testOpts ...feedTestOption, +) func(*testing.T) { + // percentTenant is the percentange of tests that will be run against + // a SQL-node in a multi-tenant server. 1 for all tests to be run on a + // tenant. + const percentTenant = 0.25 + options := &feedTestOptions{} + for _, o := range testOpts { + o(options) + } + if !options.noTenants && rand.Float32() < percentTenant { + return sinklessTenantTestWithServerArgs(argsFn, testFn) + } + return sinklessNoTenantTestWithServerArgs(argsFn, testFn) +} + +func sinklessTest(testFn cdcTestFn, testOpts ...feedTestOption) func(*testing.T) { + return sinklessTestWithServerArgs(nil, testFn, testOpts...) } func enterpriseTest(testFn cdcTestFn) func(*testing.T) { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 0fe47e36976c..f970013cc8bb 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -42,7 +42,12 @@ func TestChangefeedNemeses(t *testing.T) { t.Error(failure) } } - t.Run(`sinkless`, sinklessTest(testFn)) + // Tenant tests disabled because ALTER TABLE .. SPLIT is not + // support in multi-tenancy mode: + // + // nemeses_test.go:39: pq: unimplemented: operation is + // unsupported in multi-tenancy mode + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) log.Flush() diff --git a/pkg/ccl/changefeedccl/validations_test.go b/pkg/ccl/changefeedccl/validations_test.go index 720f061ebc35..b992f8c73210 100644 --- a/pkg/ccl/changefeedccl/validations_test.go +++ b/pkg/ccl/changefeedccl/validations_test.go @@ -98,7 +98,11 @@ func TestCatchupScanOrdering(t *testing.T) { } }) } - t.Run(`sinkless`, sinklessTest(testFn)) + // Tenant tests skipped because of: + // validations_test.go:40: executing ALTER TABLE bank SPLIT AT + // VALUES (5): pq: unimplemented: operation is unsupported in + // multi-tenancy mode + t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) }