Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.2.1-rc: sql: deflake TestConcurrentSchemaChanges #129737

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,10 @@ func TestConcurrentSchemaChanges(t *testing.T) {
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
s, sqlDB, _ := serverutils.StartServer(t, params)
s, setupConn, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
dbName, scName, tblName := "testdb", "testsc", "t"
useLegacyOrDeclarative := func() error {
useLegacyOrDeclarative := func(sqlDB *gosql.DB) error {
decl := rand.Intn(2) == 0
if !decl {
_, err := sqlDB.Exec("SET use_declarative_schema_changer='off';")
Expand All @@ -633,49 +633,49 @@ func TestConcurrentSchemaChanges(t *testing.T) {
return err
}

createSchema := func() error {
createSchema := func(conn *gosql.DB) error {
return testutils.SucceedsSoonError(func() error {
_, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
_, err := conn.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
_, err = conn.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
if err != nil {
return err
}
return nil
})
}
require.NoError(t, createSchema())
require.NoError(t, createSchema(setupConn))

// repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until
// `ctx` is cancelled.
repeatWorkWithInterval := func(
workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error,
workerName string, workInterval time.Duration, work func(workConn *gosql.DB) error,
) func(context.Context) error {
return func(workerCtx context.Context) error {
sqlDB := s.SQLConn(t)
sqlDB.SetMaxOpenConns(1)
workConn := s.SQLConn(t)
workConn.SetMaxOpenConns(1)
for {
jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32())
select {
case <-workerCtx.Done():
t.Logf("%v is signaled to finish work", workerName)
return nil
case <-time.After(jitteredInterval):
if err := work(sqlDB); err != nil {
if err := work(workConn); err != nil {
t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error())
return err
}
Expand All @@ -686,23 +686,23 @@ func TestConcurrentSchemaChanges(t *testing.T) {

var nextObjectID atomic.Int64
// A goroutine that repeatedly renames database `testdb` randomly.
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
if drop {
if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
return err
}
t.Logf("DROP DATABASE %v", dbName)
return createSchema()
return createSchema(workerConn)
}
newDBName := fmt.Sprintf("testdb_%v", nextObjectID.Add(1))
if newDBName == dbName {
return nil
}
if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
return err
}
dbName = newDBName
Expand All @@ -711,8 +711,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames schema `testdb.testsc` randomly.
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
Expand All @@ -722,17 +722,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
_, err = workerConn.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
_, err = workerConn.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
}
if err == nil {
if !drop {
scName = newSCName
t.Logf("RENAME SCHEMA TO %v", newSCName)
} else {
t.Logf("DROP SCHEMA TO %v", scName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) {
err = nil // mute those errors as they're expected
Expand All @@ -742,25 +742,25 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames table `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
newTblName := fmt.Sprintf("t_%v", nextObjectID.Add(1))
drop := rand.Intn(2) == 0
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
_, err = workerConn.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
_, err = workerConn.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
}
if err == nil {
if !drop {
tblName = newTblName
t.Logf("RENAME TABLE TO %v", newTblName)
} else {
t.Logf("DROP TABLE %v", newTblName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) {
err = nil
Expand All @@ -770,14 +770,14 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that adds columns to `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
dbName, scName, tblName := dbName, scName, tblName
newColName := fmt.Sprintf("col_%v", nextObjectID.Add(1))

_, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
_, err := workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
dbName, scName, tblName, newColName, rand.Intn(100)))
if err == nil {
t.Logf("ADD COLUMN %v TO %v.%v.%v", newColName, dbName, scName, tblName)
Expand All @@ -790,18 +790,18 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops columns from `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a non-PK column to drop.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
_, err = workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
dbName, scName, tblName, colName))
if err == nil {
t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName)
Expand All @@ -814,17 +814,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that creates secondary index on a randomly selected column.
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error {
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(workerConn *gosql.DB) error {
newIndexName := fmt.Sprintf("idx_%v", nextObjectID.Add(1))

// Randomly pick a non-PK column to create an index on.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
_, err = workerConn.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
newIndexName, dbName, scName, tblName, colName))
if err == nil {
t.Logf("CREATE INDEX %v ON %v.%v.%v(%v)", newIndexName, dbName, scName, tblName, colName)
Expand All @@ -841,17 +841,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops a secondary index randomly.
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a public, secondary index to drop.
dbName, scName, tblName := dbName, scName, tblName
indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName)
indexName, err := getASecondaryIndex(workerConn, dbName, scName, tblName)
if err != nil || indexName == "" {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
_, err = workerConn.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
if err == nil {
t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema,
Expand All @@ -870,8 +870,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}

// getANonPrimaryKeyColumn returns a non-primary-key column from table `dbName.scName.tblName`.
func getANonPrimaryKeyColumn(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getANonPrimaryKeyColumn(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT column_name
FROM [show columns from %s.%s.%s]
WHERE column_name != 'col'
Expand All @@ -894,8 +894,8 @@ ORDER BY random(); -- shuffle column output
}

// getASecondaryIndex returns a secondary index from table `dbName.scName.tblName`.
func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getASecondaryIndex(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT index_name
FROM [show indexes from %s.%s.%s]
WHERE index_name NOT LIKE '%%_pkey'
Expand Down