diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index beeb687883b1..58485c337304 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -620,10 +620,10 @@ func TestConcurrentSchemaChanges(t *testing.T) { // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } - s, sqlDB, _ := serverutils.StartServer(t, params) + s, setupConn, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) dbName, scName, tblName := "testdb", "testsc", "t" - useLegacyOrDeclarative := func() error { + useLegacyOrDeclarative := func(sqlDB *gosql.DB) error { decl := rand.Intn(2) == 0 if !decl { _, err := sqlDB.Exec("SET use_declarative_schema_changer='off';") @@ -633,41 +633,41 @@ func TestConcurrentSchemaChanges(t *testing.T) { return err } - createSchema := func() error { + createSchema := func(conn *gosql.DB) error { return testutils.SucceedsSoonError(func() error { - _, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName)) + _, err := conn.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName)) + _, err = conn.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) if err != nil { return err } return nil }) } - require.NoError(t, createSchema()) + require.NoError(t, createSchema(setupConn)) // repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until // `ctx` is cancelled. repeatWorkWithInterval := func( - workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error, + workerName string, workInterval time.Duration, work func(workConn *gosql.DB) error, ) func(context.Context) error { return func(workerCtx context.Context) error { - sqlDB := s.SQLConn(t) - sqlDB.SetMaxOpenConns(1) + workConn := s.SQLConn(t) + workConn.SetMaxOpenConns(1) for { jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) select { @@ -675,7 +675,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("%v is signaled to finish work", workerName) return nil case <-time.After(jitteredInterval): - if err := work(sqlDB); err != nil { + if err := work(workConn); err != nil { t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) return err } @@ -686,23 +686,23 @@ func TestConcurrentSchemaChanges(t *testing.T) { var nextObjectID atomic.Int64 // A goroutine that repeatedly renames database `testdb` randomly. - g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } drop := rand.Intn(2) == 0 if drop { - if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil { + if _, err := workerConn.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil { return err } t.Logf("DROP DATABASE %v", dbName) - return createSchema() + return createSchema(workerConn) } newDBName := fmt.Sprintf("testdb_%v", nextObjectID.Add(1)) if newDBName == dbName { return nil } - if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { + if _, err := workerConn.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { return err } dbName = newDBName @@ -711,8 +711,8 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames schema `testdb.testsc` randomly. - g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } drop := rand.Intn(2) == 0 @@ -722,9 +722,9 @@ func TestConcurrentSchemaChanges(t *testing.T) { } var err error if !drop { - _, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + _, err = workerConn.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) } else { - _, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName)) + _, err = workerConn.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName)) } if err == nil { if !drop { @@ -732,7 +732,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("RENAME SCHEMA TO %v", newSCName) } else { t.Logf("DROP SCHEMA TO %v", scName) - return createSchema() + return createSchema(workerConn) } } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) { err = nil // mute those errors as they're expected @@ -742,17 +742,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames table `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } newTblName := fmt.Sprintf("t_%v", nextObjectID.Add(1)) drop := rand.Intn(2) == 0 var err error if !drop { - _, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + _, err = workerConn.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) } else { - _, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName)) + _, err = workerConn.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName)) } if err == nil { if !drop { @@ -760,7 +760,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("RENAME TABLE TO %v", newTblName) } else { t.Logf("DROP TABLE %v", newTblName) - return createSchema() + return createSchema(workerConn) } } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) { err = nil @@ -770,14 +770,14 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that adds columns to `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } dbName, scName, tblName := dbName, scName, tblName newColName := fmt.Sprintf("col_%v", nextObjectID.Add(1)) - _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", + _, err := workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100))) if err == nil { t.Logf("ADD COLUMN %v TO %v.%v.%v", newColName, dbName, scName, tblName) @@ -790,18 +790,18 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops columns from `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } // Randomly pick a non-PK column to drop. dbName, scName, tblName := dbName, scName, tblName - colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName) + colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName) if err != nil || colName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", + _, err = workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName)) if err == nil { t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName) @@ -814,17 +814,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that creates secondary index on a randomly selected column. - g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error { + g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(workerConn *gosql.DB) error { newIndexName := fmt.Sprintf("idx_%v", nextObjectID.Add(1)) // Randomly pick a non-PK column to create an index on. dbName, scName, tblName := dbName, scName, tblName - colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName) + colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName) if err != nil || colName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", + _, err = workerConn.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName)) if err == nil { t.Logf("CREATE INDEX %v ON %v.%v.%v(%v)", newIndexName, dbName, scName, tblName, colName) @@ -841,17 +841,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops a secondary index randomly. - g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } // Randomly pick a public, secondary index to drop. dbName, scName, tblName := dbName, scName, tblName - indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName) + indexName, err := getASecondaryIndex(workerConn, dbName, scName, tblName) if err != nil || indexName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) + _, err = workerConn.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) if err == nil { t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName) } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, @@ -870,8 +870,8 @@ func TestConcurrentSchemaChanges(t *testing.T) { } // getANonPrimaryKeyColumn returns a non-primary-key column from table `dbName.scName.tblName`. -func getANonPrimaryKeyColumn(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) { - colNameRow, err := sqlDB.Query(fmt.Sprintf(` +func getANonPrimaryKeyColumn(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) { + colNameRow, err := workerConn.Query(fmt.Sprintf(` SELECT column_name FROM [show columns from %s.%s.%s] WHERE column_name != 'col' @@ -894,8 +894,8 @@ ORDER BY random(); -- shuffle column output } // getASecondaryIndex returns a secondary index from table `dbName.scName.tblName`. -func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) { - colNameRow, err := sqlDB.Query(fmt.Sprintf(` +func getASecondaryIndex(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) { + colNameRow, err := workerConn.Query(fmt.Sprintf(` SELECT index_name FROM [show indexes from %s.%s.%s] WHERE index_name NOT LIKE '%%_pkey'