diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1e38a3fb3872..dc5e4a9ca118 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -299,18 +299,14 @@ func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error { return nil } -// Run starts previously unstarted jobs from a list of scheduled -// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs. -func (r *Registry) Run( +// WaitForJobs waits for a given list of jobs to reach some sort +// of terminal state. +func (r *Registry) WaitForJobs( ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID, ) error { if len(jobs) == 0 { return nil } - log.Infof(ctx, "scheduled jobs %+v", jobs) - if err := r.NotifyToAdoptJobs(ctx); err != nil { - return err - } buf := bytes.Buffer{} for i, id := range jobs { if i > 0 { @@ -372,6 +368,25 @@ func (r *Registry) Run( return nil } +// Run starts previously unstarted jobs from a list of scheduled +// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs. +func (r *Registry) Run( + ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID, +) error { + if len(jobs) == 0 { + return nil + } + log.Infof(ctx, "scheduled jobs %+v", jobs) + if err := r.NotifyToAdoptJobs(ctx); err != nil { + return err + } + err := r.WaitForJobs(ctx, ex, jobs) + if err != nil { + return err + } + return nil +} + // NewJob creates a new Job. func (r *Registry) NewJob(record Record, jobID jobspb.JobID) *Job { job := &Job{ diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 1acb81c43d1d..159f461f0017 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -206,10 +206,16 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error { log.Infof(ctx, "running backfill for %q, v=%d", tableDesc.Name, tableDesc.Version) needColumnBackfill := false - for _, m := range tableDesc.Mutations { + for mutationIdx, m := range tableDesc.Mutations { if m.MutationID != sc.mutationID { break } + // If the current mutation is discarded, then + // skip over processing. + if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, mutationIdx+1); discarded { + continue + } + switch m.Direction { case descpb.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { @@ -1865,6 +1871,11 @@ func runSchemaChangesInTxn( // mutations that need to be processed. for i := 0; i < len(tableDesc.Mutations); i++ { m := tableDesc.Mutations[i] + // Skip mutations that get canceled by later operations + if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, i+1); discarded { + continue + } + immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable() switch m.Direction { case descpb.DescriptorMutation_ADD: diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index fa3271970a1d..e73f1eb3bbfa 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -1689,6 +1689,26 @@ SELECT count(descriptor_id) ---- 0 +# Validation for #47719 where a mutation is canceled out by a drop. +statement ok +create table tColDrop (a int); + +statement ok +begin; +alter table tColDrop alter column a set not null; +alter table tColDrop drop column a; +commit; + +statement ok +drop table tColDrop; + +statement ok +begin; +create table tColDrop (a int); +alter table tColDrop alter column a set not null; +alter table tColDrop drop column a; +commit; + # Validate that the schema_change_successful metric query T SELECT feature_name FROM crdb_internal.feature_usage diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 99aa7e89dd73..e5cf2de5872f 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1045,9 +1045,11 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Jobs (for GC, etc.) that need to be started immediately after the table // descriptor updates are published. var didUpdate bool + var depMutationJobs []jobspb.JobID modified, err := sc.txnWithModified(ctx, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { + var err error scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) if err != nil { return err @@ -1292,9 +1294,17 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // The table descriptor is unchanged, return without writing anything. return nil } + committedMutations := scTable.Mutations[:i] // Trim the executed mutations from the descriptor. scTable.Mutations = scTable.Mutations[i:] + // Check any jobs that we need to depend on for the current + // job to be successful. + depMutationJobs, err = sc.getDependentMutationsJobs(ctx, scTable, committedMutations) + if err != nil { + return err + } + for i, g := range scTable.MutationJobs { if g.MutationID == sc.mutationID { // Trim the executed mutation group from the descriptor. @@ -1385,6 +1395,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil { return err } + + // If any operations was skipped because a mutation was made + // redundant due to a column getting dropped later on then we should + // wait for those jobs to complete before returning our result back. + if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil { + return errors.Wrap(err, "A dependent transaction failed for this schema change") + } + return nil } @@ -1564,6 +1582,12 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError return errors.AssertionFailedf("mutation already rolled back: %v", mutation) } + // Ignore mutations that would be skipped, nothing + // to reverse here. + if discarded, _ := isCurrentMutationDiscarded(scTable, mutation, i+1); discarded { + continue + } + log.Warningf(ctx, "reverse schema change mutation: %+v", mutation) scTable.Mutations[i], columns = sc.reverseMutation(mutation, false /*notStarted*/, columns) @@ -2094,7 +2118,6 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er return err } } - execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error { sc := SchemaChanger{ descID: descID, @@ -2519,3 +2542,64 @@ func DeleteTableDescAndZoneConfig( return txn.Run(ctx, b) }) } + +// getDependentMutationJobs gets the dependent jobs that need to complete for +// the current schema change to be considered successful. For example, if column +// A will have a unique index, but it later gets dropped. Then for this mutation +// to be successful the drop column job has to be successful too. +func (sc *SchemaChanger) getDependentMutationsJobs( + ctx context.Context, tableDesc *tabledesc.Mutable, mutations []descpb.DescriptorMutation, +) ([]jobspb.JobID, error) { + dependentJobs := make([]jobspb.JobID, 0, len(tableDesc.MutationJobs)) + for _, m := range mutations { + // Find all the mutations that we depend + discarded, dependentID := isCurrentMutationDiscarded(tableDesc, m, 0) + if discarded { + jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, dependentID) + if err != nil { + return nil, err + } + dependentJobs = append(dependentJobs, jobID) + } + } + return dependentJobs, nil +} + +// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant +// by a later operation. The nextMutationIdx provides the index at which to check for +// later mutation. +func isCurrentMutationDiscarded( + tableDesc *tabledesc.Mutable, currentMutation descpb.DescriptorMutation, nextMutationIdx int, +) (bool, descpb.MutationID) { + if nextMutationIdx+1 > len(tableDesc.Mutations) { + return false, descpb.InvalidMutationID + } + + colToCheck := make([]descpb.ColumnID, 0, 1) + // Both NOT NULL related updates and check constraint updates + // involving this column will get canceled out by a drop column. + if constraint := currentMutation.GetConstraint(); constraint != nil { + if constraint.ConstraintType == descpb.ConstraintToUpdate_NOT_NULL { + colToCheck = append(colToCheck, constraint.NotNullColumn) + } else if constraint.ConstraintType == descpb.ConstraintToUpdate_CHECK { + colToCheck = constraint.Check.ColumnIDs + } + } + + for _, m := range tableDesc.Mutations[nextMutationIdx:] { + colDesc := m.GetColumn() + if m.Direction == descpb.DescriptorMutation_DROP && + colDesc != nil && + !m.Rollback { + // Column was dropped later on, so this operation + // should be a no-op. + for _, col := range colToCheck { + if colDesc.ID == col { + return true, m.MutationID + } + } + } + } + // Not discarded by any later operation. + return false, descpb.InvalidMutationID +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 7ca52572d771..ca6996de4746 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -7030,3 +7030,271 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { } }) } + +// TestDropColumnAfterMutations tests the imapct of a drop column +// after an existing a mutation on the column +func TestDropColumnAfterMutations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer sqltestutils.SetTestJobsAdoptInterval()() + ctx := context.Background() + var jobControlMu syncutil.Mutex + var delayJobList []string + var delayJobChannels []chan struct{} + delayNotify := make(chan struct{}) + + proceedBeforeBackfill := make(chan error) + params, _ := tests.CreateTestServerParams() + + var s serverutils.TestServerInterface + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + lockHeld := true + jobControlMu.Lock() + scJob, err := s.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobID) + if err != nil { + return err + } + pl := scJob.Payload() + // Check if we are blocking the correct job + for idx, s := range delayJobList { + if strings.Contains(pl.Description, s) { + delayNotify <- struct{}{} + channel := delayJobChannels[idx] + jobControlMu.Unlock() + lockHeld = false + <-channel + } + } + if lockHeld { + jobControlMu.Unlock() + } + return nil + }, + RunAfterBackfill: func(jobID jobspb.JobID) error { + return <-proceedBeforeBackfill + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + conn1 := sqlutils.MakeSQLRunner(sqlDB) + conn2 := sqlutils.MakeSQLRunner(sqlDB) + var schemaChangeWaitGroup sync.WaitGroup + + conn1.Exec(t, ` +CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); +INSERT INTO t VALUES (1, 1); +`) + + // Test 1: with concurrent drop and mutations + t.Run("basic-concurrent-drop-mutations", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ADD COLUMN k INT8 NOT NULL UNIQUE DEFAULT 42", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(ctx, + ` +BEGIN; +ALTER TABLE t ALTER COLUMN j SET NOT NULL; +ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; +COMMIT; +`) + if err != nil { + t.Error(err) + } + }() + <-delayNotify + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(ctx, + ` +SET sql_safe_updates = false; +BEGIN; +ALTER TABLE t DROP COLUMN j; +ALTER TABLE t ALTER COLUMN k SET NOT NULL; +COMMIT; +`) + if err != nil { + t.Error(err) + } + }() + <-delayNotify + + // Allow jobs to proceed once both are concurrent. + delayJobChannels[0] <- struct{}{} + // Allow both back fill jobs to proceed + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do back fill next + proceedBeforeBackfill <- nil + + schemaChangeWaitGroup.Wait() + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + // Test 2: with concurrent drop and mutations, where + // the drop column will be failed intentionally + t.Run("failed-concurrent-drop-mutations", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + conn2.Exec(t, ` + DROP TABLE t; + CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); + INSERT INTO t VALUES (1, 1); + `) + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(context.Background(), + ` + BEGIN; + ALTER TABLE t ALTER COLUMN j SET NOT NULL; + ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (XXUUU): A dependent transaction failed for this schema change: Bogus error for drop column transaction" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + + // Wait for the alter to get submitted first + <-delayNotify + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn1.DB.ExecContext(context.Background(), + ` + SET sql_safe_updates = false; + BEGIN; + ALTER TABLE t DROP COLUMN j; + ALTER TABLE t ALTER COLUMN k SET NOT NULL; + ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; + ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (XXUUU): Bogus error for drop column transaction" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + // Allow the first operation relying on + // the dropped column to resume. + delayJobChannels[0] <- struct{}{} + // Allow internal backfill processing to resume + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do backfill next + proceedBeforeBackfill <- errors.Newf("Bogus error for drop column transaction") + // Rollback attempt after failure + proceedBeforeBackfill <- nil + + schemaChangeWaitGroup.Wait() + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + // Test 3: with concurrent drop and mutations where an insert will + // cause the backfill operation to fail. + t.Run("concurrent-drop-mutations-insert-fail", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + conn2.Exec(t, ` + DROP TABLE t; + CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); + INSERT INTO t VALUES (1, 1); + `) + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(context.Background(), + ` + BEGIN; + ALTER TABLE t ALTER COLUMN j SET NOT NULL; + ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL; + COMMIT; + `) + // Two possibilities exist based on timing, either the current transaction + // will fail during backfill or the dependent one with the drop will fail. + failureError := "pq: transaction committed but schema change aborted with error: (23505): A dependent transaction failed for this schema change: failed to ingest index entries during backfill: duplicate key value violates unique constraint \"t_o_key\"" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn1.DB.ExecContext(context.Background(), + ` + SET sql_safe_updates = false; + BEGIN; + ALTER TABLE t DROP COLUMN j; + ALTER TABLE t ALTER COLUMN k SET NOT NULL; + ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; + ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; + INSERT INTO t VALUES (2); + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (23505): failed to ingest index entries during backfill: duplicate key value violates unique constraint \"t_o_key\"" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + // Allow jobs to proceed once both are concurrent. + // Allow the first operation relying on + // the dropped column to resume. + delayJobChannels[0] <- struct{}{} + // Allow internal backfill processing to resume + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do backfill next + proceedBeforeBackfill <- nil + schemaChangeWaitGroup.Wait() + + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + close(delayNotify) + close(proceedBeforeBackfill) +}