From d5cdc3c750c38eca49341fbe57e33092eea3db15 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 20 Aug 2024 03:27:33 +0000 Subject: [PATCH] sql: deflake TestConcurrentSchemaChanges Previously, TestConcurrentSchemaChanges was intermittently failing because the connection pool was not correctly being used. Some of operations like selecting if the declarative or legacy schema changer should be used were incorrectly being run on the main connection. To address this, this patch uses the dedicated connections per-thread. Fixes: #129220 Release note: None --- pkg/sql/schemachanger/schemachanger_test.go | 92 ++++++++++----------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index beeb687883b1..58485c337304 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -620,10 +620,10 @@ func TestConcurrentSchemaChanges(t *testing.T) { // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } - s, sqlDB, _ := serverutils.StartServer(t, params) + s, setupConn, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) dbName, scName, tblName := "testdb", "testsc", "t" - useLegacyOrDeclarative := func() error { + useLegacyOrDeclarative := func(sqlDB *gosql.DB) error { decl := rand.Intn(2) == 0 if !decl { _, err := sqlDB.Exec("SET use_declarative_schema_changer='off';") @@ -633,41 +633,41 @@ func TestConcurrentSchemaChanges(t *testing.T) { return err } - createSchema := func() error { + createSchema := func(conn *gosql.DB) error { return testutils.SucceedsSoonError(func() error { - _, err := sqlDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName)) + _, err := conn.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v;", dbName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName)) + _, err = conn.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %v.%v;", dbName, scName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v.%v.%v(col INT PRIMARY KEY);", dbName, scName, tblName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("DELETE FROM %v.%v.%v;", dbName, scName, tblName)) if err != nil { return err } - _, err = sqlDB.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + _, err = conn.Exec(fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) if err != nil { return err } return nil }) } - require.NoError(t, createSchema()) + require.NoError(t, createSchema(setupConn)) // repeatWorkWithInterval repeats `work` indefinitely every `workInterval` until // `ctx` is cancelled. repeatWorkWithInterval := func( - workerName string, workInterval time.Duration, work func(sqlDB *gosql.DB) error, + workerName string, workInterval time.Duration, work func(workConn *gosql.DB) error, ) func(context.Context) error { return func(workerCtx context.Context) error { - sqlDB := s.SQLConn(t) - sqlDB.SetMaxOpenConns(1) + workConn := s.SQLConn(t) + workConn.SetMaxOpenConns(1) for { jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) select { @@ -675,7 +675,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("%v is signaled to finish work", workerName) return nil case <-time.After(jitteredInterval): - if err := work(sqlDB); err != nil { + if err := work(workConn); err != nil { t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) return err } @@ -686,23 +686,23 @@ func TestConcurrentSchemaChanges(t *testing.T) { var nextObjectID atomic.Int64 // A goroutine that repeatedly renames database `testdb` randomly. - g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-db-worker", renameDBInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } drop := rand.Intn(2) == 0 if drop { - if _, err := sqlDB.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil { + if _, err := workerConn.Exec(fmt.Sprintf("DROP DATABASE %v CASCADE", dbName)); err != nil { return err } t.Logf("DROP DATABASE %v", dbName) - return createSchema() + return createSchema(workerConn) } newDBName := fmt.Sprintf("testdb_%v", nextObjectID.Add(1)) if newDBName == dbName { return nil } - if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { + if _, err := workerConn.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { return err } dbName = newDBName @@ -711,8 +711,8 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames schema `testdb.testsc` randomly. - g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } drop := rand.Intn(2) == 0 @@ -722,9 +722,9 @@ func TestConcurrentSchemaChanges(t *testing.T) { } var err error if !drop { - _, err = sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + _, err = workerConn.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) } else { - _, err = sqlDB.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName)) + _, err = workerConn.Exec(fmt.Sprintf("DROP SCHEMA %v.%v CASCADE", dbName, scName)) } if err == nil { if !drop { @@ -732,7 +732,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("RENAME SCHEMA TO %v", newSCName) } else { t.Logf("DROP SCHEMA TO %v", scName) - return createSchema() + return createSchema(workerConn) } } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema) { err = nil // mute those errors as they're expected @@ -742,17 +742,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that renames table `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } newTblName := fmt.Sprintf("t_%v", nextObjectID.Add(1)) drop := rand.Intn(2) == 0 var err error if !drop { - _, err = sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + _, err = workerConn.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) } else { - _, err = sqlDB.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName)) + _, err = workerConn.Exec(fmt.Sprintf(`DROP TABLE %v.%v.%v`, dbName, scName, tblName)) } if err == nil { if !drop { @@ -760,7 +760,7 @@ func TestConcurrentSchemaChanges(t *testing.T) { t.Logf("RENAME TABLE TO %v", newTblName) } else { t.Logf("DROP TABLE %v", newTblName) - return createSchema() + return createSchema(workerConn) } } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedObject, pgcode.UndefinedTable) { err = nil @@ -770,14 +770,14 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that adds columns to `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("add-column-worker", addColInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } dbName, scName, tblName := dbName, scName, tblName newColName := fmt.Sprintf("col_%v", nextObjectID.Add(1)) - _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", + _, err := workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100))) if err == nil { t.Logf("ADD COLUMN %v TO %v.%v.%v", newColName, dbName, scName, tblName) @@ -790,18 +790,18 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops columns from `testdb.testsc.t` randomly. - g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("drop-column-worker", dropColInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } // Randomly pick a non-PK column to drop. dbName, scName, tblName := dbName, scName, tblName - colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName) + colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName) if err != nil || colName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", + _, err = workerConn.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName)) if err == nil { t.Logf("DROP COLUMN %v FROM %v.%v.%v", colName, dbName, scName, tblName) @@ -814,17 +814,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that creates secondary index on a randomly selected column. - g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(sqlDB *gosql.DB) error { + g.GoCtx(repeatWorkWithInterval("create-index-worker", createIdxInterval, func(workerConn *gosql.DB) error { newIndexName := fmt.Sprintf("idx_%v", nextObjectID.Add(1)) // Randomly pick a non-PK column to create an index on. dbName, scName, tblName := dbName, scName, tblName - colName, err := getANonPrimaryKeyColumn(sqlDB, dbName, scName, tblName) + colName, err := getANonPrimaryKeyColumn(workerConn, dbName, scName, tblName) if err != nil || colName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", + _, err = workerConn.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName)) if err == nil { t.Logf("CREATE INDEX %v ON %v.%v.%v(%v)", newIndexName, dbName, scName, tblName, colName) @@ -841,17 +841,17 @@ func TestConcurrentSchemaChanges(t *testing.T) { })) // A goroutine that drops a secondary index randomly. - g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(sqlDB *gosql.DB) error { - if err := useLegacyOrDeclarative(); err != nil { + g.GoCtx(repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func(workerConn *gosql.DB) error { + if err := useLegacyOrDeclarative(workerConn); err != nil { return err } // Randomly pick a public, secondary index to drop. dbName, scName, tblName := dbName, scName, tblName - indexName, err := getASecondaryIndex(sqlDB, dbName, scName, tblName) + indexName, err := getASecondaryIndex(workerConn, dbName, scName, tblName) if err != nil || indexName == "" { return err } - _, err = sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) + _, err = workerConn.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) if err == nil { t.Logf("DROP INDEX %v FROM %v.%v.%v", indexName, dbName, scName, tblName) } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, @@ -870,8 +870,8 @@ func TestConcurrentSchemaChanges(t *testing.T) { } // getANonPrimaryKeyColumn returns a non-primary-key column from table `dbName.scName.tblName`. -func getANonPrimaryKeyColumn(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) { - colNameRow, err := sqlDB.Query(fmt.Sprintf(` +func getANonPrimaryKeyColumn(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) { + colNameRow, err := workerConn.Query(fmt.Sprintf(` SELECT column_name FROM [show columns from %s.%s.%s] WHERE column_name != 'col' @@ -894,8 +894,8 @@ ORDER BY random(); -- shuffle column output } // getASecondaryIndex returns a secondary index from table `dbName.scName.tblName`. -func getASecondaryIndex(sqlDB *gosql.DB, dbName, scName, tblName string) (string, error) { - colNameRow, err := sqlDB.Query(fmt.Sprintf(` +func getASecondaryIndex(workerConn *gosql.DB, dbName, scName, tblName string) (string, error) { + colNameRow, err := workerConn.Query(fmt.Sprintf(` SELECT index_name FROM [show indexes from %s.%s.%s] WHERE index_name NOT LIKE '%%_pkey'