Skip to content

Commit

Permalink
changefeedccl: randomly run sinkless tests on tenant
Browse files Browse the repository at this point in the history
Now, sinkless tests will run against a tenant's SQL server 25% of the
time, unless they opt-out via an option.

The main two reasons for tests needing to opt-out of running on
tenants are

1) Unsupported SQL statements in mutli-tenant mode (such as REGION
   related statements or SPLIT)

2) Needing complex interactions with the TestServerInterface. All such
   skips are currently marked with a TODO.

Release note: None
  • Loading branch information
stevendanna committed May 21, 2021
1 parent 0553b25 commit 71c977b
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 21 deletions.
32 changes: 23 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVersionModificationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1088,7 +1090,9 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {

}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVerionModifationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1209,7 +1213,7 @@ func TestChangefeedInterleaved(t *testing.T) {

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true`)
sqlDB.Exec(t, `CREATE TABLE grandparent (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO grandparent VALUES (0, 'grandparent-0')`)
grandparent := feed(t, f, `CREATE CHANGEFEED FOR grandparent`)
Expand Down Expand Up @@ -1413,7 +1417,10 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of:
// changefeed_test.go:1409: error executing 'IMPORT INTO
// for_import CSV DATA ($1)': pq: fake protectedts.Provide
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
Expand Down Expand Up @@ -1448,7 +1455,13 @@ func TestChangefeedFailOnRBRChange(t *testing.T) {
Value: testServerRegion,
})
}
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn))

// Tenants skiped because of:
//
// error executing 'ALTER DATABASE d PRIMARY REGION
// "us-east-1"': pq: get_live_cluster_regions: unimplemented:
// operation is unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn))
t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn))
t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn))
Expand Down Expand Up @@ -1990,8 +2003,8 @@ func TestChangefeedMonitoring(t *testing.T) {
return nil
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, func(t *testing.T) {
skip.WithIssue(t, 38443)
enterpriseTest(testFn)
Expand Down Expand Up @@ -2241,8 +2254,9 @@ func TestChangefeedSchemaTTL(t *testing.T) {
t.Errorf(`expected "GC threshold" error got: %+v`, err)
}
}

t.Run("sinkless", sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in forceTableGC
t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants))
t.Run("enterprise", enterpriseTest(testFn))
t.Run("cloudstorage", cloudStorageTest(testFn))
t.Run("kafka", kafkaTest(testFn))
Expand Down
124 changes: 114 additions & 10 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
gojson "encoding/json"
"fmt"
"math/rand"
"net/url"
"reflect"
"sort"
Expand Down Expand Up @@ -245,6 +246,14 @@ func expectResolvedTimestampAvro(
type cdcTestFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)
type updateArgsFn func(args *base.TestServerArgs)

var serverSetupStatements = `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`

func startTestServer(
t testing.TB, argsFn updateArgsFn,
) (serverutils.TestServerInterface, *gosql.DB, func()) {
Expand Down Expand Up @@ -276,22 +285,100 @@ func startTestServer(
}
}()

_, err = db.ExecContext(ctx, `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`)
_, err = db.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

if region := serverArgsRegion(args); region != "" {
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
require.NoError(t, err)
}

return s, db, cleanup
}

func sinklessTestWithServerArgs(
type feedTestOptions struct {
noTenants bool
}

type feedTestOption func(opts *feedTestOptions)

// feedTestNoTenants is a feedTestOption that will prohibit this tests
// from randomly running on a tenant.
var feedTestNoTenants = func(opts *feedTestOptions) { opts.noTenants = true }

// testServerShim is a kludge to get a few more tests working in
// tenant-mode.
//
// Currently, our TestFeedFactory has a Server() method that returns a
// TestServerInterface. The TestTenantInterface returned by
// StartTenant isn't a TestServerInterface.
//
// TODO(ssd): Clean this up. Perhaps we can add a SQLServer() method
// to TestFeedFactory that returns just the bits that are shared.
type testServerShim struct {
serverutils.TestServerInterface
sqlServer serverutils.TestTenantInterface
}

func (t *testServerShim) ServingSQLAddr() string {
return t.sqlServer.SQLAddr()
}

func sinklessTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
// We need to open a new log scope because StartTenant
// calls log.SetNodeIDs which can only be called once
// per log scope. If we don't open a log scope here,
// then any test function that wants to use this twice
// would fail.
defer log.Scope(t).Close(t)
ctx := context.Background()
kvServer, _, cleanup := startTestServer(t, func(args *base.TestServerArgs) {
args.ExternalIODirConfig.DisableOutbound = true
if argsFn != nil {
argsFn(args)
}
})
defer cleanup()

tenantID := serverutils.TestTenantID()
tenantArgs := base.TestTenantArgs{
// crdb_internal.create_tenant called by StartTenant
TenantID: tenantID,
// Non-enterprise changefeeds are currently only
// disabled by setting DisableOutbound true
// everywhere.
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)

// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

// Database `d` is hardcoded in a number of
// places. Create a new connection to that database.
tenantDB = serverutils.OpenDBConn(t, tenantServer.SQLAddr(), `d`, false /* insecure */, kvServer.Stopper())

sink, cleanup := sqlutils.PGUrl(t, tenantServer.SQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()

server := &testServerShim{kvServer, tenantServer}
f := makeSinklessFeedFactory(server, sink)

// Log so that it is clear if a failed test happened
// to run on a tenant.
t.Logf("Running sinkless test using tenant %s", tenantID)
testFn(t, tenantDB, f)
}
}

func sinklessNoTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -305,8 +392,25 @@ func sinklessTestWithServerArgs(
}
}

func sinklessTest(testFn cdcTestFn) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn)
func sinklessTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn, testOpts ...feedTestOption,
) func(*testing.T) {
// percentTenant is the percentange of tests that will be run against
// a SQL-node in a multi-tenant server. 1 for all tests to be run on a
// tenant.
const percentTenant = 0.25
options := &feedTestOptions{}
for _, o := range testOpts {
o(options)
}
if !options.noTenants && rand.Float32() < percentTenant {
return sinklessTenantTestWithServerArgs(argsFn, testFn)
}
return sinklessNoTenantTestWithServerArgs(argsFn, testFn)
}

func sinklessTest(testFn cdcTestFn, testOpts ...feedTestOption) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn, testOpts...)
}

func enterpriseTest(testFn cdcTestFn) func(*testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ func TestChangefeedNemeses(t *testing.T) {
t.Error(failure)
}
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests disabled because ALTER TABLE .. SPLIT is not
// support in multi-tenancy mode:
//
// nemeses_test.go:39: pq: unimplemented: operation is
// unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
log.Flush()
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func TestCatchupScanOrdering(t *testing.T) {
}
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests skipped because of:
// validations_test.go:40: executing ALTER TABLE bank SPLIT AT
// VALUES (5): pq: unimplemented: operation is unsupported in
// multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
}

Expand Down

0 comments on commit 71c977b

Please sign in to comment.