diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index b51746dbe4aa..844e722a1bfd 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -6945,296 +6945,6 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { }) } -// TestDropColumnAfterMutations tests the impact of a drop column -// after an existing mutation on the column. -func TestDropColumnAfterMutations(t *testing.T) { - defer leaktest.AfterTest(t)() - skip.WithIssue(t, 76843, "flaky test") - defer log.Scope(t).Close(t) - ctx := context.Background() - var jobControlMu syncutil.Mutex - var delayJobList []string - var delayJobChannels []chan struct{} - delayNotify := make(chan struct{}) - jobIDs := make([]jobspb.JobID, 2) - - proceedBeforeBackfill := make(chan error) - params, _ := tests.CreateTestServerParams() - - var s serverutils.TestServerInterface - params.Knobs = base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(jobID jobspb.JobID) error { - lockHeld := true - jobControlMu.Lock() - scJob, err := s.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobID) - if err != nil { - return err - } - pl := scJob.Payload() - // Check if we are blocking the correct job. - for idx, s := range delayJobList { - if strings.Contains(pl.Description, s) { - jobIDs[idx] = jobID - delayNotify <- struct{}{} - channel := delayJobChannels[idx] - jobControlMu.Unlock() - lockHeld = false - <-channel - break - } - } - if lockHeld { - jobControlMu.Unlock() - } - return nil - }, - RunAfterBackfill: func(jobID jobspb.JobID) error { - return <-proceedBeforeBackfill - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - var schemaChangeWaitGroup sync.WaitGroup - - tdb.Exec(t, `CREATE TABLE t (i INT8 PRIMARY KEY, j INT8);`) - tdb.Exec(t, `INSERT INTO t VALUES (1, 1);`) - - // Test 1: with concurrent drop and mutations. - t.Run("basic-concurrent-drop-mutations", func(t *testing.T) { - jobControlMu.Lock() - delayJobList = []string{"ALTER TABLE defaultdb.public.t ADD COLUMN k INT8 NOT NULL UNIQUE DEFAULT 42", - "ALTER TABLE defaultdb.public.t DROP COLUMN j"} - delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} - jobControlMu.Unlock() - - schemaChangeWaitGroup.Add(1) - go func() { - defer schemaChangeWaitGroup.Done() - _, err := tdb.DB.ExecContext(ctx, - ` -BEGIN; -ALTER TABLE t ALTER COLUMN j SET NOT NULL; -ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; -COMMIT; -`) - if err != nil { - t.Error(err) - } - }() - <-delayNotify - schemaChangeWaitGroup.Add(1) - go func() { - defer schemaChangeWaitGroup.Done() - _, err := tdb.DB.ExecContext(ctx, - ` -SET sql_safe_updates = false; -BEGIN; -ALTER TABLE t DROP COLUMN j; -ALTER TABLE t ALTER COLUMN k SET NOT NULL; -COMMIT; -`) - if err != nil { - t.Error(err) - } - }() - <-delayNotify - - // Allow jobs to proceed once both are concurrent. - delayJobChannels[0] <- struct{}{} - // Allow both backfill jobs to proceed. - proceedBeforeBackfill <- nil - // Allow the second job to proceed. - delayJobChannels[1] <- struct{}{} - // Second job will also do backfill next. - proceedBeforeBackfill <- nil - - schemaChangeWaitGroup.Wait() - close(delayJobChannels[0]) - close(delayJobChannels[1]) - }) - - // Test 2: with concurrent drop and mutations, where - // the drop column will be failed intentionally. - t.Run("failed-concurrent-drop-mutations", func(t *testing.T) { - jobControlMu.Lock() - delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", - "ALTER TABLE defaultdb.public.t DROP COLUMN j"} - delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} - jobControlMu.Unlock() - - tdb.Exec(t, `DROP TABLE t;`) - tdb.Exec(t, `CREATE TABLE t (i INT8 PRIMARY KEY, j INT8);`) - tdb.Exec(t, `INSERT INTO t VALUES (1, 1);`) - go func() { - // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = tdb.DB.ExecContext(context.Background(), - ` - SET application_name='failed-concurrent-drop-mutations'; - BEGIN; - ALTER TABLE t ALTER COLUMN j SET NOT NULL; - ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; - COMMIT; - `) - }() - - // Wait for the alter to get submitted first. - <-delayNotify - - go func() { - // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = tdb.DB.ExecContext(context.Background(), - ` - SET application_name='failed-concurrent-drop-mutations'; - SET sql_safe_updates = false; - BEGIN; - ALTER TABLE t DROP COLUMN j; - ALTER TABLE t ALTER COLUMN k SET NOT NULL; - ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; - ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; - COMMIT; - `) - }() - <-delayNotify - - // Allow the first operation relying on - // the dropped column to resume. - delayJobChannels[0] <- struct{}{} - // Allow internal backfill processing to resume - proceedBeforeBackfill <- nil - // Allow the second job to proceed - delayJobChannels[1] <- struct{}{} - // Second job will also do backfill next - proceedBeforeBackfill <- errors.Newf("Bogus error for drop column transaction") - - // We expect the first job to be stuck in running state. - // At this point, first job is now waiting for second to complete. However, first job - // is in an infinite-loop due to retries while reverting. Therefore, we don't wait - // for the jobs to complete. Instead, we validate their current status before completing - // the test. - - // Second job should be in reverting state and retrying. - tdb.CheckQueryResultsRetry(t, - fmt.Sprintf("SELECT num_runs > 3 FROM system.jobs WHERE id = %d AND status = '%s'", jobIDs[1], jobs.StatusReverting), - [][]string{{"true"}}, - ) - // First job should be in running state. - tdb.CheckQueryResults(t, fmt.Sprintf("SELECT status from system.jobs WHERE id = %d", jobIDs[0]), [][]string{{string(jobs.StatusRunning)}}) - // Both jobs should be stuck in COMMIT, waiting for jobs to complete. - tdb.CheckQueryResults(t, - "SELECT count(*) FROM [SHOW SESSIONS] WHERE last_active_query LIKE '%COMMIT%' AND session_id != (SELECT * FROM [SHOW session_id]) "+ - "AND application_name='failed-concurrent-drop-mutations'", - [][]string{{"2"}}, - ) - - close(delayJobChannels[0]) - close(delayJobChannels[1]) - }) - - // Test 3: with concurrent drop and mutations where an insert will - // cause the backfill operation to fail. - t.Run("concurrent-drop-mutations-insert-fail", func(t *testing.T) { - tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`) - - jobControlMu.Lock() - delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", - "ALTER TABLE defaultdb.public.t DROP COLUMN j"} - delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} - jobControlMu.Unlock() - - tdb.Exec(t, `DROP TABLE t;`) - tdb.Exec(t, `CREATE TABLE t (i INT8 PRIMARY KEY, j INT8);`) - tdb.Exec(t, `INSERT INTO t VALUES (1, 1);`) - - go func() { - // Two possibilities exist based on timing, either the following transaction - // will fail during backfill or the dependent one with the drop will fail. - - // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = tdb.DB.ExecContext(context.Background(), - ` - SET application_name='concurrent-drop-mutations-insert-fail'; - BEGIN; - ALTER TABLE t ALTER COLUMN j SET NOT NULL; - ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL; - COMMIT; - `) - }() - <-delayNotify - - go func() { - // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = tdb.DB.ExecContext(context.Background(), - ` - SET application_name='concurrent-drop-mutations-insert-fail'; - SET sql_safe_updates = false; - BEGIN; - ALTER TABLE t DROP COLUMN j; - ALTER TABLE t ALTER COLUMN k SET NOT NULL; - ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; - ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; - INSERT INTO t VALUES (2); - COMMIT; - `) - }() - <-delayNotify - - // Allow jobs to proceed once both are concurrent. - // Allow the first operation relying on - // the dropped column to resume. - delayJobChannels[0] <- struct{}{} - // Allow internal backfill processing to resume - proceedBeforeBackfill <- nil - // Allow the second job to proceed - delayJobChannels[1] <- struct{}{} - - var revertingJobID jobspb.JobID - testutils.SucceedsSoon(t, func() error { - // In this test, depending on the concurrent execution schedule, one of the jobs - // will be stuck in running state while the other job will be stuck in reverting state. - // Here we check that this statement is valid. Moreover, we also get the ID of the - // reverting job to ensure that it is retrying. Furthermore, we prevent the running - // job to backfill to validate this test's correctness assumptions. - firstJobID := jobIDs[0] - secondJobID := jobIDs[1] - res := tdb.QueryStr(t, "SELECT status from system.jobs where id in ($1, $2)", firstJobID, secondJobID) - require.Len(t, res, 2) - firstJobStatus := jobs.Status(res[0][0]) - secondJobStatus := jobs.Status(res[1][0]) - if firstJobStatus == jobs.StatusRunning && secondJobStatus == jobs.StatusReverting { - revertingJobID = secondJobID - return nil - } else if firstJobStatus == jobs.StatusReverting && secondJobStatus == jobs.StatusRunning { - revertingJobID = firstJobID - return nil - } - return errors.New("one job should be running while the other job should be reverting") - }) - // Ensure that the reverting job is retrying. - tdb.CheckQueryResultsRetry(t, - fmt.Sprintf("SELECT num_runs > 3 FROM system.jobs WHERE id = %d", revertingJobID), - [][]string{{"true"}}, - ) - // Both jobs should be stuck in COMMIT, waiting for jobs to complete. - tdb.CheckQueryResults(t, - "SELECT count(*) FROM [SHOW SESSIONS] WHERE last_active_query LIKE '%COMMIT%' AND session_id != (SELECT * FROM [SHOW session_id]) "+ - "AND application_name='concurrent-drop-mutations-insert-fail'", - [][]string{{"2"}}, - ) - - close(delayJobChannels[0]) - close(delayJobChannels[1]) - }) - - close(delayNotify) - close(proceedBeforeBackfill) -} - // TestCheckConstraintDropAndColumn tests for Issue #61749 which uncovered // that checks would be incorrectly activated if a drop column occurred, even // if they weren't fully validated.