Skip to content

Commit

Permalink
Merge pull request #129737 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2.1-rc-129296

release-24.2.1-rc: sql: deflake TestConcurrentSchemaChanges
  • Loading branch information
rafiss authored Aug 27, 2024
2 parents ae8318c + d5cdc3c commit 5fad41a
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,10 @@ func TestConcurrentSchemaChanges(t *testing.T) {
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
s, sqlDB, _ := serverutils.StartServer(t, params)
s, setupConn, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
dbName, scName, tblName := "testdb", "testsc", "t"
useLegacyOrDeclarative := func() error {
useLegacyOrDeclarative := func(sqlDB *gosql.DB) error {
decl := rand.Intn(2) == 0
if !decl {
_, err := sqlDB.Exec("SET use_declarative_schema_changer='off';")
Expand All @@ -633,49 +633,49 @@ func TestConcurrentSchemaChanges(t *testing.T) {
return err
}

createSchema := func() error {
createSchema := func(conn *gosql.DB) error {
return testutils.SucceedsSoonError(func() error {
_, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
_, err := conn.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
_, err = conn.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
if err != nil {
return err
}
return nil
})
}
require.NoError(t, createSchema())
require.NoError(t, createSchema(setupConn))

// repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until
// `ctx` is cancelled.
repeatWorkWithInterval := func(
workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error,
workerName string, workInterval time.Duration, work func(workConn *gosql.DB) error,
) func(context.Context) error {
return func(workerCtx context.Context) error {
sqlDB := s.SQLConn(t)
sqlDB.SetMaxOpenConns(1)
workConn := s.SQLConn(t)
workConn.SetMaxOpenConns(1)
for {
jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32())
select {
case <-workerCtx.Done():
t.Logf("%v is signaled to finish work", workerName)
return nil
case <-time.After(jitteredInterval):
if err := work(sqlDB); err != nil {
if err := work(workConn); err != nil {
t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error())
return err
}
Expand All @@ -686,23 +686,23 @@ func TestConcurrentSchemaChanges(t *testing.T) {

var nextObjectID atomic.Int64
// A goroutine that repeatedly renames database `testdb` randomly.
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
if drop {
if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
return err
}
t.Logf("DROP DATABASE %v", dbName)
return createSchema()
return createSchema(workerConn)
}
newDBName := fmt.Sprintf("testdb_%v", nextObjectID.Add(1))
if newDBName == dbName {
return nil
}
if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
return err
}
dbName = newDBName
Expand All @@ -711,8 +711,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames schema `testdb.testsc` randomly.
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
Expand All @@ -722,17 +722,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
_, err = workerConn.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
_, err = workerConn.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
}
if err == nil {
if !drop {
scName = newSCName
t.Logf("RENAME SCHEMA TO %v", newSCName)
} else {
t.Logf("DROP SCHEMA TO %v", scName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) {
err = nil // mute those errors as they're expected
Expand All @@ -742,25 +742,25 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames table `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
newTblName := fmt.Sprintf("t_%v", nextObjectID.Add(1))
drop := rand.Intn(2) == 0
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
_, err = workerConn.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
_, err = workerConn.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
}
if err == nil {
if !drop {
tblName = newTblName
t.Logf("RENAME TABLE TO %v", newTblName)
} else {
t.Logf("DROP TABLE %v", newTblName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) {
err = nil
Expand All @@ -770,14 +770,14 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that adds columns to `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
dbName, scName, tblName := dbName, scName, tblName
newColName := fmt.Sprintf("col_%v", nextObjectID.Add(1))

_, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
_, err := workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
dbName, scName, tblName, newColName, rand.Intn(100)))
if err == nil {
t.Logf("ADD COLUMN %v TO %v.%v.%v", newColName, dbName, scName, tblName)
Expand All @@ -790,18 +790,18 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops columns from `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a non-PK column to drop.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
_, err = workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
dbName, scName, tblName, colName))
if err == nil {
t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName)
Expand All @@ -814,17 +814,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that creates secondary index on a randomly selected column.
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error {
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(workerConn *gosql.DB) error {
newIndexName := fmt.Sprintf("idx_%v", nextObjectID.Add(1))

// Randomly pick a non-PK column to create an index on.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
_, err = workerConn.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
newIndexName, dbName, scName, tblName, colName))
if err == nil {
t.Logf("CREATE INDEX %v ON %v.%v.%v(%v)", newIndexName, dbName, scName, tblName, colName)
Expand All @@ -841,17 +841,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops a secondary index randomly.
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a public, secondary index to drop.
dbName, scName, tblName := dbName, scName, tblName
indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName)
indexName, err := getASecondaryIndex(workerConn, dbName, scName, tblName)
if err != nil || indexName == "" {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
_, err = workerConn.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
if err == nil {
t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema,
Expand All @@ -870,8 +870,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}

// getANonPrimaryKeyColumn returns a non-primary-key column from table `dbName.scName.tblName`.
func getANonPrimaryKeyColumn(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getANonPrimaryKeyColumn(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT column_name
FROM [show columns from %s.%s.%s]
WHERE column_name != 'col'
Expand All @@ -894,8 +894,8 @@ ORDER BY random(); -- shuffle column output
}

// getASecondaryIndex returns a secondary index from table `dbName.scName.tblName`.
func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getASecondaryIndex(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT index_name
FROM [show indexes from %s.%s.%s]
WHERE index_name NOT LIKE '%%_pkey'
Expand Down

0 comments on commit 5fad41a

Please sign in to comment.