Skip to content

Commit

Permalink
Merge #65056
Browse files Browse the repository at this point in the history
65056: changefeedccl: randomly run sinkless tests on tenant r=miretskiy a=stevendanna

Now, sinkless tests will run against a tenant's SQL server 25% of the
time, unless they opt-out via an option.

The main two reasons for tests needing to opt-out of running on
tenants are

1) Unsupported SQL statements in mutli-tenant mode (such as REGION
   related statements or SPLIT)

2) Needing complex interactions with the TestServerInterface. All such
   skips are currently marked with a TODO.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed May 21, 2021
2 parents 303f766 + 600b64b commit 41756ef
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 44 deletions.
5 changes: 5 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,9 @@ type TestTenantArgs struct {
// ExternalIODirConfig is used to initialize the same-named
// field on the server.Config struct.
ExternalIODirConfig ExternalIODirConfig

// If set, this will be appended to the Postgres URL by functions that
// automatically open a connection to the server. That's equivalent to running
// SET DATABASE=foo, which works even if the database doesn't (yet) exist.
UseDatabase string
}
60 changes: 29 additions & 31 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,10 @@ func TestChangefeedTenants(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)()

kvServer, kvSQLdb, _ := serverutils.StartServer(t, base.TestServerArgs{
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
kvServer, kvSQLdb, cleanup := startTestServer(t, func(args *base.TestServerArgs) {
args.ExternalIODirConfig.DisableOutbound = true
})
defer kvServer.Stopper().Stop(ctx)
defer cleanup()

tenantArgs := base.TestTenantArgs{
// crdb_internal.create_tenant called by StartTenant
Expand All @@ -234,27 +229,16 @@ func TestChangefeedTenants(t *testing.T) {
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
UseDatabase: `d`,
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
// TODO(ssd): Cleanup this shared setup code once the refactor
// in #64693 is setttled.
tenantSQL.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tenantSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
tenantSQL.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)

// Database `d` is hardcoded in a number of places. Create it
// and create a new connection to that database.
tenantSQL.Exec(t, `CREATE DATABASE d`)
tenantSQL = sqlutils.MakeSQLRunner(
serverutils.OpenDBConn(t,
tenantServer.SQLAddr(), `d`, false /* insecure */, kvServer.Stopper()))
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
tenantSQL.Exec(t, serverSetupStatements)

tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
t.Run("changefeed on non-tenant table fails", func(t *testing.T) {
kvSQL := sqlutils.MakeSQLRunner(kvSQLdb)
kvSQL.Exec(t, `CREATE DATABASE d`)
kvSQL.Exec(t, `CREATE TABLE d.foo (pk INT PRIMARY KEY)`)

tenantSQL.ExpectErr(t, `table "foo" does not exist`,
Expand Down Expand Up @@ -1035,7 +1019,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVersionModificationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1088,7 +1074,9 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {

}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in fetchDescVerionModifationTime
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
log.Flush()
Expand Down Expand Up @@ -1209,7 +1197,7 @@ func TestChangefeedInterleaved(t *testing.T) {

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true`)
sqlDB.Exec(t, `CREATE TABLE grandparent (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO grandparent VALUES (0, 'grandparent-0')`)
grandparent := feed(t, f, `CREATE CHANGEFEED FOR grandparent`)
Expand Down Expand Up @@ -1413,7 +1401,10 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of:
// changefeed_test.go:1409: error executing 'IMPORT INTO
// for_import CSV DATA ($1)': pq: fake protectedts.Provide
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
Expand Down Expand Up @@ -1448,7 +1439,13 @@ func TestChangefeedFailOnRBRChange(t *testing.T) {
Value: testServerRegion,
})
}
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn))

// Tenants skiped because of:
//
// error executing 'ALTER DATABASE d PRIMARY REGION
// "us-east-1"': pq: get_live_cluster_regions: unimplemented:
// operation is unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn))
t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn))
t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn))
Expand Down Expand Up @@ -1990,8 +1987,8 @@ func TestChangefeedMonitoring(t *testing.T) {
return nil
})
}

t.Run(`sinkless`, sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, func(t *testing.T) {
skip.WithIssue(t, 38443)
enterpriseTest(testFn)
Expand Down Expand Up @@ -2241,8 +2238,9 @@ func TestChangefeedSchemaTTL(t *testing.T) {
t.Errorf(`expected "GC threshold" error got: %+v`, err)
}
}

t.Run("sinkless", sinklessTest(testFn))
// TODO(ssd): tenant tests skipped because of f.Server() use
// in forceTableGC
t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants))
t.Run("enterprise", enterpriseTest(testFn))
t.Run("cloudstorage", cloudStorageTest(testFn))
t.Run("kafka", kafkaTest(testFn))
Expand Down
121 changes: 111 additions & 10 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
gojson "encoding/json"
"fmt"
"math/rand"
"net/url"
"reflect"
"sort"
Expand Down Expand Up @@ -245,6 +246,14 @@ func expectResolvedTimestampAvro(
type cdcTestFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)
type updateArgsFn func(args *base.TestServerArgs)

var serverSetupStatements = `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`

func startTestServer(
t testing.TB, argsFn updateArgsFn,
) (serverutils.TestServerInterface, *gosql.DB, func()) {
Expand Down Expand Up @@ -276,22 +285,97 @@ func startTestServer(
}
}()

_, err = db.ExecContext(ctx, `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING sql.defaults.vectorize=on;
CREATE DATABASE d;
`)
_, err = db.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

if region := serverArgsRegion(args); region != "" {
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
require.NoError(t, err)
}

return s, db, cleanup
}

func sinklessTestWithServerArgs(
type feedTestOptions struct {
noTenants bool
}

type feedTestOption func(opts *feedTestOptions)

// feedTestNoTenants is a feedTestOption that will prohibit this tests
// from randomly running on a tenant.
var feedTestNoTenants = func(opts *feedTestOptions) { opts.noTenants = true }

// testServerShim is a kludge to get a few more tests working in
// tenant-mode.
//
// Currently, our TestFeedFactory has a Server() method that returns a
// TestServerInterface. The TestTenantInterface returned by
// StartTenant isn't a TestServerInterface.
//
// TODO(ssd): Clean this up. Perhaps we can add a SQLServer() method
// to TestFeedFactory that returns just the bits that are shared.
type testServerShim struct {
serverutils.TestServerInterface
sqlServer serverutils.TestTenantInterface
}

func (t *testServerShim) ServingSQLAddr() string {
return t.sqlServer.SQLAddr()
}

func sinklessTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
// We need to open a new log scope because StartTenant
// calls log.SetNodeIDs which can only be called once
// per log scope. If we don't open a log scope here,
// then any test function that wants to use this twice
// would fail.
defer log.Scope(t).Close(t)
ctx := context.Background()
kvServer, _, cleanup := startTestServer(t, func(args *base.TestServerArgs) {
args.ExternalIODirConfig.DisableOutbound = true
if argsFn != nil {
argsFn(args)
}
})
defer cleanup()

tenantID := serverutils.TestTenantID()
tenantArgs := base.TestTenantArgs{
// crdb_internal.create_tenant called by StartTenant
TenantID: tenantID,
// Non-enterprise changefeeds are currently only
// disabled by setting DisableOutbound true
// everywhere.
ExternalIODirConfig: base.ExternalIODirConfig{
DisableOutbound: true,
},
UseDatabase: `d`,
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)

// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)

sink, cleanup := sqlutils.PGUrl(t, tenantServer.SQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()

server := &testServerShim{kvServer, tenantServer}
f := makeSinklessFeedFactory(server, sink)

// Log so that it is clear if a failed test happened
// to run on a tenant.
t.Logf("Running sinkless test using tenant %s", tenantID)
testFn(t, tenantDB, f)
}
}

func sinklessNoTenantTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn,
) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -305,8 +389,25 @@ func sinklessTestWithServerArgs(
}
}

func sinklessTest(testFn cdcTestFn) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn)
func sinklessTestWithServerArgs(
argsFn func(args *base.TestServerArgs), testFn cdcTestFn, testOpts ...feedTestOption,
) func(*testing.T) {
// percentTenant is the percentange of tests that will be run against
// a SQL-node in a multi-tenant server. 1 for all tests to be run on a
// tenant.
const percentTenant = 0.25
options := &feedTestOptions{}
for _, o := range testOpts {
o(options)
}
if !options.noTenants && rand.Float32() < percentTenant {
return sinklessTenantTestWithServerArgs(argsFn, testFn)
}
return sinklessNoTenantTestWithServerArgs(argsFn, testFn)
}

func sinklessTest(testFn cdcTestFn, testOpts ...feedTestOption) func(*testing.T) {
return sinklessTestWithServerArgs(nil, testFn, testOpts...)
}

func enterpriseTest(testFn cdcTestFn) func(*testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ func TestChangefeedNemeses(t *testing.T) {
t.Error(failure)
}
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests disabled because ALTER TABLE .. SPLIT is not
// support in multi-tenancy mode:
//
// nemeses_test.go:39: pq: unimplemented: operation is
// unsupported in multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
log.Flush()
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func TestCatchupScanOrdering(t *testing.T) {
}
})
}
t.Run(`sinkless`, sinklessTest(testFn))
// Tenant tests skipped because of:
// validations_test.go:40: executing ALTER TABLE bank SPLIT AT
// VALUES (5): pq: unimplemented: operation is unsupported in
// multi-tenancy mode
t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants))
t.Run(`enterprise`, enterpriseTest(testFn))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func StartTenant(
}

goDB := OpenDBConn(
t, tenant.SQLAddr(), "", false /* insecure */, stopper)
t, tenant.SQLAddr(), params.UseDatabase, false /* insecure */, stopper)
return tenant, goDB
}

Expand Down

0 comments on commit 41756ef

Please sign in to comment.