Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: randomly run sinkless tests on tenant #65056

Merged
merged 2 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,9 @@ type TestTenantArgs struct {
// ExternalIODirConfig is used to initialize the same-named
// field on the server.Config struct.
ExternalIODirConfig ExternalIODirConfig

// If set, this will be appended to the Postgres URL by functions that
// automatically open a connection to the server. That's equivalent to running
// SET DATABASE=foo, which works even if the database doesn't (yet) exist.
UseDatabase string
}
60 changes: 29 additions & 31 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,10 @@ func TestChangefeedTenants(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)()

kvServer, kvSQLdb, _ := serverutils.StartServer(t, base.TestServerArgs{
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
kvServer, kvSQLdb, cleanup := startTestServer(t, func(args *base.TestServerArgs) {
args.ExternalIODirConfig.DisableOutbound = true
})
defer kvServer.Stopper().Stop(ctx)
defer cleanup()

tenantArgs := base.TestTenantArgs{
// crdb_internal.create_tenant called by StartTenant
Expand All @@ -234,27 +229,16 @@ func TestChangefeedTenants(t *testing.T) {
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
UseDatabase: `d`,
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
// TODO(ssd): Cleanup this shared setup code once the refactor
// in #64693 is setttled.
tenantSQL.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tenantSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
tenantSQL.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)

// Database `d` is hardcoded in a number of places. Create it
// and create a new connection to that database.
tenantSQL.Exec(t, `CREATE DATABASE d`)
tenantSQL = sqlutils.MakeSQLRunner(
serverutils.OpenDBConn(t,
tenantServer.SQLAddr(), `d`, false /* insecure */, kvServer.Stopper()))
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
tenantSQL.Exec(t, serverSetupStatements)

tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
t.Run("changefeed on non-tenant table fails", func(t *testing.T) {
kvSQL := sqlutils.MakeSQLRunner(kvSQLdb)
kvSQL.Exec(t, `CREATE DATABASE d`)
kvSQL.Exec(t, `CREATE TABLE d.foo (pk INT PRIMARY KEY)`)

tenantSQL.ExpectErr(t, `table "foo" does not exist`,
Expand Down Expand Up @@ -1035,7 +1019,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVersionModificationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1088,7 +1074,9 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {

}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVerionModifationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1209,7 +1197,7 @@ func TestChangefeedInterleaved(t *testing.T) {

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true`)
sqlDB.Exec(t, `CREATE TABLE grandparent (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO grandparent VALUES (0, 'grandparent-0')`)
grandparent := feed(t, f, `CREATE CHANGEFEED FOR grandparent`)
Expand Down Expand Up @@ -1413,7 +1401,10 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of:
// changefeed_test.go:1409: error executing 'IMPORT INTO
// for_import CSV DATA ($1)': pq: fake protectedts.Provide
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
Expand Down Expand Up @@ -1448,7 +1439,13 @@ func TestChangefeedFailOnRBRChange(t *testing.T) {
Value: testServerRegion,
})
}
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn))

// Tenants skiped because of:
//
// error executing 'ALTER DATABASE d PRIMARY REGION
// "us-east-1"': pq: get_live_cluster_regions: unimplemented:
// operation is unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn))
t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn))
t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn))
Expand Down Expand Up @@ -1990,8 +1987,8 @@ func TestChangefeedMonitoring(t *testing.T) {
return nil
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, func(t *testing.T) {
skip.WithIssue(t, 38443)
enterpriseTest(testFn)
Expand Down Expand Up @@ -2241,8 +2238,9 @@ func TestChangefeedSchemaTTL(t *testing.T) {
t.Errorf(`expected "GC threshold" error got: %+v`, err)
}
}

t.Run("sinkless", sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in forceTableGC
t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants))
t.Run("enterprise", enterpriseTest(testFn))
t.Run("cloudstorage", cloudStorageTest(testFn))
t.Run("kafka", kafkaTest(testFn))
Expand Down
121 changes: 111 additions & 10 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
gojson "encoding/json"
"fmt"
"math/rand"
"net/url"
"reflect"
"sort"
Expand Down Expand Up @@ -245,6 +246,14 @@ func expectResolvedTimestampAvro(
type cdcTestFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)
type updateArgsFn func(args *base.TestServerArgs)

var serverSetupStatements = `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`

func startTestServer(
t testing.TB, argsFn updateArgsFn,
) (serverutils.TestServerInterface, *gosql.DB, func()) {
Expand Down Expand Up @@ -276,22 +285,97 @@ func startTestServer(
}
}()

_, err = db.ExecContext(ctx, `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`)
_, err = db.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

if region := serverArgsRegion(args); region != "" {
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
require.NoError(t, err)
}

return s, db, cleanup
}

func sinklessTestWithServerArgs(
type feedTestOptions struct {
noTenants bool
}

type feedTestOption func(opts *feedTestOptions)

// feedTestNoTenants is a feedTestOption that will prohibit this tests
// from randomly running on a tenant.
var feedTestNoTenants = func(opts *feedTestOptions) { opts.noTenants = true }

// testServerShim is a kludge to get a few more tests working in
// tenant-mode.
//
// Currently, our TestFeedFactory has a Server() method that returns a
// TestServerInterface. The TestTenantInterface returned by
// StartTenant isn't a TestServerInterface.
//
// TODO(ssd): Clean this up. Perhaps we can add a SQLServer() method
// to TestFeedFactory that returns just the bits that are shared.
type testServerShim struct {
serverutils.TestServerInterface
sqlServer serverutils.TestTenantInterface
}

func (t *testServerShim) ServingSQLAddr() string {
return t.sqlServer.SQLAddr()
}

func sinklessTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
// We need to open a new log scope because StartTenant
// calls log.SetNodeIDs which can only be called once
// per log scope. If we don't open a log scope here,
// then any test function that wants to use this twice
// would fail.
defer log.Scope(t).Close(t)
ctx := context.Background()
kvServer, _, cleanup := startTestServer(t, func(args *base.TestServerArgs) {
args.ExternalIODirConfig.DisableOutbound = true
if argsFn != nil {
argsFn(args)
}
})
defer cleanup()

tenantID := serverutils.TestTenantID()
tenantArgs := base.TestTenantArgs{
// crdb_internal.create_tenant called by StartTenant
TenantID: tenantID,
// Non-enterprise changefeeds are currently only
// disabled by setting DisableOutbound true
// everywhere.
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
UseDatabase: `d`,
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)

// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

sink, cleanup := sqlutils.PGUrl(t, tenantServer.SQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()

server := &testServerShim{kvServer, tenantServer}
f := makeSinklessFeedFactory(server, sink)

// Log so that it is clear if a failed test happened
// to run on a tenant.
t.Logf("Running sinkless test using tenant %s", tenantID)
testFn(t, tenantDB, f)
}
}

func sinklessNoTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -305,8 +389,25 @@ func sinklessTestWithServerArgs(
}
}

func sinklessTest(testFn cdcTestFn) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn)
func sinklessTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn, testOpts ...feedTestOption,
) func(*testing.T) {
// percentTenant is the percentange of tests that will be run against
// a SQL-node in a multi-tenant server. 1 for all tests to be run on a
// tenant.
const percentTenant = 0.25
options := &feedTestOptions{}
for _, o := range testOpts {
o(options)
}
if !options.noTenants && rand.Float32() < percentTenant {
return sinklessTenantTestWithServerArgs(argsFn, testFn)
}
return sinklessNoTenantTestWithServerArgs(argsFn, testFn)
}

func sinklessTest(testFn cdcTestFn, testOpts ...feedTestOption) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn, testOpts...)
}

func enterpriseTest(testFn cdcTestFn) func(*testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ func TestChangefeedNemeses(t *testing.T) {
t.Error(failure)
}
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests disabled because ALTER TABLE .. SPLIT is not
// support in multi-tenancy mode:
//
// nemeses_test.go:39: pq: unimplemented: operation is
// unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
log.Flush()
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func TestCatchupScanOrdering(t *testing.T) {
}
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests skipped because of:
// validations_test.go:40: executing ALTER TABLE bank SPLIT AT
// VALUES (5): pq: unimplemented: operation is unsupported in
// multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func StartTenant(
}

goDB := OpenDBConn(
t, tenant.SQLAddr(), "", false /* insecure */, stopper)
t, tenant.SQLAddr(), params.UseDatabase, false /* insecure */, stopper)
return tenant, goDB
}

Expand Down