Skip to content

Commit

Permalink
Merge #129296
Browse files Browse the repository at this point in the history
129296: sql: deflake TestConcurrentSchemaChanges r=fqazi a=fqazi

Previously, TestConcurrentSchemaChanges was intermittently failing because the connection pool was not correctly being used. Some of operations like selecting if the declarative or legacy schema changer should be used were incorrectly being run on the main connection. To address this, this patch uses the dedicated connections per-thread.

Fixes: #129220
Fixes #129271
Fixes #129132

Release note: None

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Aug 22, 2024
2 parents c8c495d + 9151ae2 commit c124f07
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,10 @@ func TestConcurrentSchemaChanges(t *testing.T) {
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
s, sqlDB, _ := serverutils.StartServer(t, params)
s, setupConn, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
dbName, scName, tblName := "testdb", "testsc", "t"
useLegacyOrDeclarative := func() error {
useLegacyOrDeclarative := func(sqlDB *gosql.DB) error {
decl := rand.Intn(2) == 0
if !decl {
_, err := sqlDB.Exec("SET use_declarative_schema_changer='off';")
Expand All @@ -633,49 +633,49 @@ func TestConcurrentSchemaChanges(t *testing.T) {
return err
}

createSchema := func() error {
createSchema := func(conn *gosql.DB) error {
return testutils.SucceedsSoonError(func() error {
_, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
_, err := conn.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
_, err = conn.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName))
if err != nil {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
_, err = conn.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
if err != nil {
return err
}
return nil
})
}
require.NoError(t, createSchema())
require.NoError(t, createSchema(setupConn))

// repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until
// `ctx` is cancelled.
repeatWorkWithInterval := func(
workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error,
workerName string, workInterval time.Duration, work func(workConn *gosql.DB) error,
) func(context.Context) error {
return func(workerCtx context.Context) error {
sqlDB := s.SQLConn(t)
sqlDB.SetMaxOpenConns(1)
workConn := s.SQLConn(t)
workConn.SetMaxOpenConns(1)
for {
jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32())
select {
case <-workerCtx.Done():
t.Logf("%v is signaled to finish work", workerName)
return nil
case <-time.After(jitteredInterval):
if err := work(sqlDB); err != nil {
if err := work(workConn); err != nil {
t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error())
return err
}
Expand All @@ -686,23 +686,23 @@ func TestConcurrentSchemaChanges(t *testing.T) {

var nextObjectID atomic.Int64
// A goroutine that repeatedly renames database `testdb` randomly.
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
if drop {
if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil {
return err
}
t.Logf("DROP DATABASE %v", dbName)
return createSchema()
return createSchema(workerConn)
}
newDBName := fmt.Sprintf("testdb_%v", nextObjectID.Add(1))
if newDBName == dbName {
return nil
}
if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
if _, err := workerConn.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
return err
}
dbName = newDBName
Expand All @@ -711,8 +711,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames schema `testdb.testsc` randomly.
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
drop := rand.Intn(2) == 0
Expand All @@ -722,17 +722,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
_, err = workerConn.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
_, err = workerConn.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName))
}
if err == nil {
if !drop {
scName = newSCName
t.Logf("RENAME SCHEMA TO %v", newSCName)
} else {
t.Logf("DROP SCHEMA TO %v", scName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) {
err = nil // mute those errors as they're expected
Expand All @@ -742,25 +742,25 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that renames table `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
newTblName := fmt.Sprintf("t_%v", nextObjectID.Add(1))
drop := rand.Intn(2) == 0
var err error
if !drop {
_, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
_, err = workerConn.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
} else {
_, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
_, err = workerConn.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName))
}
if err == nil {
if !drop {
tblName = newTblName
t.Logf("RENAME TABLE TO %v", newTblName)
} else {
t.Logf("DROP TABLE %v", newTblName)
return createSchema()
return createSchema(workerConn)
}
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) {
err = nil
Expand All @@ -770,14 +770,14 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that adds columns to `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
dbName, scName, tblName := dbName, scName, tblName
newColName := fmt.Sprintf("col_%v", nextObjectID.Add(1))

_, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
_, err := workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v",
dbName, scName, tblName, newColName, rand.Intn(100)))
if err == nil {
t.Logf("ADD COLUMN %v TO %v.%v.%v", newColName, dbName, scName, tblName)
Expand All @@ -790,18 +790,18 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops columns from `testdb.testsc.t` randomly.
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a non-PK column to drop.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
_, err = workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;",
dbName, scName, tblName, colName))
if err == nil {
t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName)
Expand All @@ -814,17 +814,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that creates secondary index on a randomly selected column.
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error {
g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(workerConn *gosql.DB) error {
newIndexName := fmt.Sprintf("idx_%v", nextObjectID.Add(1))

// Randomly pick a non-PK column to create an index on.
dbName, scName, tblName := dbName, scName, tblName
colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName)
colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName)
if err != nil || colName == "" {
return err
}

_, err = sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
_, err = workerConn.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);",
newIndexName, dbName, scName, tblName, colName))
if err == nil {
t.Logf("CREATE INDEX %v ON %v.%v.%v(%v)", newIndexName, dbName, scName, tblName, colName)
Expand All @@ -841,17 +841,17 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}))

// A goroutine that drops a secondary index randomly.
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error {
if err := useLegacyOrDeclarative(); err != nil {
g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(workerConn *gosql.DB) error {
if err := useLegacyOrDeclarative(workerConn); err != nil {
return err
}
// Randomly pick a public, secondary index to drop.
dbName, scName, tblName := dbName, scName, tblName
indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName)
indexName, err := getASecondaryIndex(workerConn, dbName, scName, tblName)
if err != nil || indexName == "" {
return err
}
_, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
_, err = workerConn.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
if err == nil {
t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName)
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema,
Expand All @@ -870,8 +870,8 @@ func TestConcurrentSchemaChanges(t *testing.T) {
}

// getANonPrimaryKeyColumn returns a non-primary-key column from table `dbName.scName.tblName`.
func getANonPrimaryKeyColumn(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getANonPrimaryKeyColumn(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT column_name
FROM [show columns from %s.%s.%s]
WHERE column_name != 'col'
Expand All @@ -894,8 +894,8 @@ ORDER BY random(); -- shuffle column output
}

// getASecondaryIndex returns a secondary index from table `dbName.scName.tblName`.
func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := sqlDB.Query(fmt.Sprintf(`
func getASecondaryIndex(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) {
colNameRow, err := workerConn.Query(fmt.Sprintf(`
SELECT index_name
FROM [show indexes from %s.%s.%s]
WHERE index_name NOT LIKE '%%_pkey'
Expand Down

0 comments on commit c124f07

Please sign in to comment.