From 011bbfb80d951870905e83d5bedc4e208e366040 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 16 Mar 2021 10:15:58 -0400 Subject: [PATCH 1/2] sql: update crdb_internal.interleaved to be more usable Previous, when we added the crdb_internal.interleaved table that showed all interleaved indexes, however it was impossible tell which table the primary key of the parent table was interleaved. This was inadequate because users could not tell what the parent table was. To address this, this patch modifies crdb_internal.interleaved to add a parent_table_name column replacing the parent_index one, since that column could only be the primary key. Release note (sql change): Updated crdb_internal.interleaved to add the parent_table_name column replacing the parent_index_name column. --- pkg/sql/crdb_internal.go | 21 +++++----- .../testdata/logic_test/create_statements | 8 +++- .../logictest/testdata/logic_test/interleaved | 42 ++++++++++++------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index c683f27b8cdf..c14ab1e6eb57 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -4144,7 +4144,11 @@ CREATE TABLE crdb_internal.interleaved ( STRING NOT NULL, index_name STRING NOT NULL, - parent_index + parent_database_name + STRING NOT NULL, + parent_schema_name + STRING NOT NULL, + parent_table_name STRING NOT NULL );`, populate: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, addRow func(...tree.Datum) error) error { @@ -4158,30 +4162,27 @@ CREATE TABLE crdb_internal.interleaved ( if index.NumInterleaveAncestors() == 0 { continue } - ancestor := index.GetInterleaveAncestor(index.NumInterleaveAncestors() - 1) parentTable, err := lookupFn.getTableByID(ancestor.TableID) if err != nil { return err } - parentIndex, err := parentTable.FindIndexWithID(ancestor.IndexID) - if err != nil { - return err - } parentSchemaName, err := lookupFn.getSchemaNameByID(parentTable.GetParentSchemaID()) if err != nil { return err } - database, err := lookupFn.getDatabaseByID(parentTable.GetParentID()) + parentDatabase, err := lookupFn.getDatabaseByID(parentTable.GetParentID()) if err != nil { return err } - if err := addRow(tree.NewDString(database.GetName()), - tree.NewDString(parentSchemaName), + if err := addRow(tree.NewDString(db.GetName()), + tree.NewDString(schemaName), tree.NewDString(table.GetName()), tree.NewDString(index.GetName()), - tree.NewDString(parentIndex.GetName())); err != nil { + tree.NewDString(parentDatabase.GetName()), + tree.NewDString(parentSchemaName), + tree.NewDString(parentTable.GetName())); err != nil { return err } } diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index dc3393b19fc4..1ef2f02b0131 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -386,13 +386,17 @@ CREATE TABLE crdb_internal.interleaved ( schema_name STRING NOT NULL, table_name STRING NOT NULL, index_name STRING NOT NULL, - parent_index STRING NOT NULL + parent_database_name STRING NOT NULL, + parent_schema_name STRING NOT NULL, + parent_table_name STRING NOT NULL ) CREATE TABLE crdb_internal.interleaved ( database_name STRING NOT NULL, schema_name STRING NOT NULL, table_name STRING NOT NULL, index_name STRING NOT NULL, - parent_index STRING NOT NULL + parent_database_name STRING NOT NULL, + parent_schema_name STRING NOT NULL, + parent_table_name STRING NOT NULL ) {} {} CREATE TABLE crdb_internal.invalid_objects ( id INT8 NULL, diff --git a/pkg/sql/logictest/testdata/logic_test/interleaved b/pkg/sql/logictest/testdata/logic_test/interleaved index fc8ae12508f8..6bc3c739e1d7 100644 --- a/pkg/sql/logictest/testdata/logic_test/interleaved +++ b/pkg/sql/logictest/testdata/logic_test/interleaved @@ -497,22 +497,34 @@ CREATE INDEX NIINDX3 ON CHILD(z); statement ok CREATE INDEX IINDX ON CHILD(rowid,x,y,z) INTERLEAVE IN PARENT PARENT(rowid); +# Child in different schema then parent +statement ok +CREATE TABLE parentDS (k STRING PRIMARY KEY); + +statement ok +CREATE SCHEMA child_schema; +statement ok +CREATE TABLE child_schema.child (ck INT8 PRIMARY KEY, k STRING NOT NULL UNIQUE); + +statement ok +CREATE INDEX interl ON child_schema.child (k) INTERLEAVE IN PARENT parentDS (k); -query TTTTT +query TTTTTTT select * from "".crdb_internal.interleaved; ---- -test public p1_1 p1_id primary -test public all_interleaves primary primary -test public all_interleaves all_interleaves_c_d_idx primary -test public all_interleaves all_interleaves_d_c_key primary -test public orders primary primary -other public interdb primary primary -test public c20067 primary primary -test public documents primary primary -test public big_interleave_parent primary primary -test public big_interleave_child primary primary -test public interleave_create_notice primary primary -test public interleave_create_notice interleave_index primary -test public interleave_pk_notice primary primary -test public child iindx primary +test public p1_1 p1_id test public p1_1 +test public all_interleaves primary test public p1_1 +test public all_interleaves all_interleaves_c_d_idx test public p1_1 +test public all_interleaves all_interleaves_d_c_key test public p1_1 +test public orders primary test public customers +test public interdb primary other public foo +test public c20067 primary test public p20067 +test public documents primary test public users +test public big_interleave_parent primary test public big_interleave_grandparent +test public big_interleave_child primary test public big_interleave_parent +test public interleave_create_notice primary test public interleave_parent +test public interleave_create_notice interleave_index test public interleave_parent +test public interleave_pk_notice primary test public interleave_parent +test public child iindx test public parent +test child_schema child interl test public parentds From a9aa92c9a6021f040e2044f236fd8b9a208fcec3 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 11 Mar 2021 10:41:08 -0500 Subject: [PATCH 2/2] sql: incorrect behaviour setting NOT NULL on column and dropping it Fixes: #47719 Previously, if a column was mutated and then dropped in a single transaction we would still try to validate the dropped column. Additionally, in general if a drop operation occurred in a different transaction we ran the danger of doing something similar. From a behaviour perspective this was bad, since it can lead to unexpected behaviour for users. To address, this patch introduces logic to scan later mutations to check if a drop column occurs. If one does occur then it will skip any operations made irrelevant by the drop. Additionally, for a correctness perspective it will wait for the drop to complete before returning back. Release note (bug fix): A constraint like not null or check on a column made irrelevant by a drop in a later concurrent transaction would lead to incorrect errors / behaviour. --- pkg/jobs/registry.go | 29 +- pkg/sql/backfill.go | 13 +- .../logictest/testdata/logic_test/alter_table | 20 ++ pkg/sql/schema_changer.go | 86 +++++- pkg/sql/schema_changer_test.go | 268 ++++++++++++++++++ 5 files changed, 407 insertions(+), 9 deletions(-) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1e38a3fb3872..dc5e4a9ca118 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -299,18 +299,14 @@ func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error { return nil } -// Run starts previously unstarted jobs from a list of scheduled -// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs. -func (r *Registry) Run( +// WaitForJobs waits for a given list of jobs to reach some sort +// of terminal state. +func (r *Registry) WaitForJobs( ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID, ) error { if len(jobs) == 0 { return nil } - log.Infof(ctx, "scheduled jobs %+v", jobs) - if err := r.NotifyToAdoptJobs(ctx); err != nil { - return err - } buf := bytes.Buffer{} for i, id := range jobs { if i > 0 { @@ -372,6 +368,25 @@ func (r *Registry) Run( return nil } +// Run starts previously unstarted jobs from a list of scheduled +// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs. +func (r *Registry) Run( + ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID, +) error { + if len(jobs) == 0 { + return nil + } + log.Infof(ctx, "scheduled jobs %+v", jobs) + if err := r.NotifyToAdoptJobs(ctx); err != nil { + return err + } + err := r.WaitForJobs(ctx, ex, jobs) + if err != nil { + return err + } + return nil +} + // NewJob creates a new Job. func (r *Registry) NewJob(record Record, jobID jobspb.JobID) *Job { job := &Job{ diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 1acb81c43d1d..159f461f0017 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -206,10 +206,16 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error { log.Infof(ctx, "running backfill for %q, v=%d", tableDesc.Name, tableDesc.Version) needColumnBackfill := false - for _, m := range tableDesc.Mutations { + for mutationIdx, m := range tableDesc.Mutations { if m.MutationID != sc.mutationID { break } + // If the current mutation is discarded, then + // skip over processing. + if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, mutationIdx+1); discarded { + continue + } + switch m.Direction { case descpb.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { @@ -1865,6 +1871,11 @@ func runSchemaChangesInTxn( // mutations that need to be processed. for i := 0; i < len(tableDesc.Mutations); i++ { m := tableDesc.Mutations[i] + // Skip mutations that get canceled by later operations + if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, i+1); discarded { + continue + } + immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable() switch m.Direction { case descpb.DescriptorMutation_ADD: diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index fa3271970a1d..e73f1eb3bbfa 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -1689,6 +1689,26 @@ SELECT count(descriptor_id) ---- 0 +# Validation for #47719 where a mutation is canceled out by a drop. +statement ok +create table tColDrop (a int); + +statement ok +begin; +alter table tColDrop alter column a set not null; +alter table tColDrop drop column a; +commit; + +statement ok +drop table tColDrop; + +statement ok +begin; +create table tColDrop (a int); +alter table tColDrop alter column a set not null; +alter table tColDrop drop column a; +commit; + # Validate that the schema_change_successful metric query T SELECT feature_name FROM crdb_internal.feature_usage diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 99aa7e89dd73..e5cf2de5872f 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1045,9 +1045,11 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Jobs (for GC, etc.) that need to be started immediately after the table // descriptor updates are published. var didUpdate bool + var depMutationJobs []jobspb.JobID modified, err := sc.txnWithModified(ctx, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { + var err error scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) if err != nil { return err @@ -1292,9 +1294,17 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // The table descriptor is unchanged, return without writing anything. return nil } + committedMutations := scTable.Mutations[:i] // Trim the executed mutations from the descriptor. scTable.Mutations = scTable.Mutations[i:] + // Check any jobs that we need to depend on for the current + // job to be successful. + depMutationJobs, err = sc.getDependentMutationsJobs(ctx, scTable, committedMutations) + if err != nil { + return err + } + for i, g := range scTable.MutationJobs { if g.MutationID == sc.mutationID { // Trim the executed mutation group from the descriptor. @@ -1385,6 +1395,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil { return err } + + // If any operations was skipped because a mutation was made + // redundant due to a column getting dropped later on then we should + // wait for those jobs to complete before returning our result back. + if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil { + return errors.Wrap(err, "A dependent transaction failed for this schema change") + } + return nil } @@ -1564,6 +1582,12 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError return errors.AssertionFailedf("mutation already rolled back: %v", mutation) } + // Ignore mutations that would be skipped, nothing + // to reverse here. + if discarded, _ := isCurrentMutationDiscarded(scTable, mutation, i+1); discarded { + continue + } + log.Warningf(ctx, "reverse schema change mutation: %+v", mutation) scTable.Mutations[i], columns = sc.reverseMutation(mutation, false /*notStarted*/, columns) @@ -2094,7 +2118,6 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er return err } } - execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error { sc := SchemaChanger{ descID: descID, @@ -2519,3 +2542,64 @@ func DeleteTableDescAndZoneConfig( return txn.Run(ctx, b) }) } + +// getDependentMutationJobs gets the dependent jobs that need to complete for +// the current schema change to be considered successful. For example, if column +// A will have a unique index, but it later gets dropped. Then for this mutation +// to be successful the drop column job has to be successful too. +func (sc *SchemaChanger) getDependentMutationsJobs( + ctx context.Context, tableDesc *tabledesc.Mutable, mutations []descpb.DescriptorMutation, +) ([]jobspb.JobID, error) { + dependentJobs := make([]jobspb.JobID, 0, len(tableDesc.MutationJobs)) + for _, m := range mutations { + // Find all the mutations that we depend + discarded, dependentID := isCurrentMutationDiscarded(tableDesc, m, 0) + if discarded { + jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, dependentID) + if err != nil { + return nil, err + } + dependentJobs = append(dependentJobs, jobID) + } + } + return dependentJobs, nil +} + +// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant +// by a later operation. The nextMutationIdx provides the index at which to check for +// later mutation. +func isCurrentMutationDiscarded( + tableDesc *tabledesc.Mutable, currentMutation descpb.DescriptorMutation, nextMutationIdx int, +) (bool, descpb.MutationID) { + if nextMutationIdx+1 > len(tableDesc.Mutations) { + return false, descpb.InvalidMutationID + } + + colToCheck := make([]descpb.ColumnID, 0, 1) + // Both NOT NULL related updates and check constraint updates + // involving this column will get canceled out by a drop column. + if constraint := currentMutation.GetConstraint(); constraint != nil { + if constraint.ConstraintType == descpb.ConstraintToUpdate_NOT_NULL { + colToCheck = append(colToCheck, constraint.NotNullColumn) + } else if constraint.ConstraintType == descpb.ConstraintToUpdate_CHECK { + colToCheck = constraint.Check.ColumnIDs + } + } + + for _, m := range tableDesc.Mutations[nextMutationIdx:] { + colDesc := m.GetColumn() + if m.Direction == descpb.DescriptorMutation_DROP && + colDesc != nil && + !m.Rollback { + // Column was dropped later on, so this operation + // should be a no-op. + for _, col := range colToCheck { + if colDesc.ID == col { + return true, m.MutationID + } + } + } + } + // Not discarded by any later operation. + return false, descpb.InvalidMutationID +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 7ca52572d771..ca6996de4746 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -7030,3 +7030,271 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { } }) } + +// TestDropColumnAfterMutations tests the imapct of a drop column +// after an existing a mutation on the column +func TestDropColumnAfterMutations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer sqltestutils.SetTestJobsAdoptInterval()() + ctx := context.Background() + var jobControlMu syncutil.Mutex + var delayJobList []string + var delayJobChannels []chan struct{} + delayNotify := make(chan struct{}) + + proceedBeforeBackfill := make(chan error) + params, _ := tests.CreateTestServerParams() + + var s serverutils.TestServerInterface + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + lockHeld := true + jobControlMu.Lock() + scJob, err := s.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobID) + if err != nil { + return err + } + pl := scJob.Payload() + // Check if we are blocking the correct job + for idx, s := range delayJobList { + if strings.Contains(pl.Description, s) { + delayNotify <- struct{}{} + channel := delayJobChannels[idx] + jobControlMu.Unlock() + lockHeld = false + <-channel + } + } + if lockHeld { + jobControlMu.Unlock() + } + return nil + }, + RunAfterBackfill: func(jobID jobspb.JobID) error { + return <-proceedBeforeBackfill + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + conn1 := sqlutils.MakeSQLRunner(sqlDB) + conn2 := sqlutils.MakeSQLRunner(sqlDB) + var schemaChangeWaitGroup sync.WaitGroup + + conn1.Exec(t, ` +CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); +INSERT INTO t VALUES (1, 1); +`) + + // Test 1: with concurrent drop and mutations + t.Run("basic-concurrent-drop-mutations", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ADD COLUMN k INT8 NOT NULL UNIQUE DEFAULT 42", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(ctx, + ` +BEGIN; +ALTER TABLE t ALTER COLUMN j SET NOT NULL; +ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; +COMMIT; +`) + if err != nil { + t.Error(err) + } + }() + <-delayNotify + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(ctx, + ` +SET sql_safe_updates = false; +BEGIN; +ALTER TABLE t DROP COLUMN j; +ALTER TABLE t ALTER COLUMN k SET NOT NULL; +COMMIT; +`) + if err != nil { + t.Error(err) + } + }() + <-delayNotify + + // Allow jobs to proceed once both are concurrent. + delayJobChannels[0] <- struct{}{} + // Allow both back fill jobs to proceed + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do back fill next + proceedBeforeBackfill <- nil + + schemaChangeWaitGroup.Wait() + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + // Test 2: with concurrent drop and mutations, where + // the drop column will be failed intentionally + t.Run("failed-concurrent-drop-mutations", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + conn2.Exec(t, ` + DROP TABLE t; + CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); + INSERT INTO t VALUES (1, 1); + `) + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(context.Background(), + ` + BEGIN; + ALTER TABLE t ALTER COLUMN j SET NOT NULL; + ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL UNIQUE; + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (XXUUU): A dependent transaction failed for this schema change: Bogus error for drop column transaction" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + + // Wait for the alter to get submitted first + <-delayNotify + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn1.DB.ExecContext(context.Background(), + ` + SET sql_safe_updates = false; + BEGIN; + ALTER TABLE t DROP COLUMN j; + ALTER TABLE t ALTER COLUMN k SET NOT NULL; + ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; + ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (XXUUU): Bogus error for drop column transaction" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + // Allow the first operation relying on + // the dropped column to resume. + delayJobChannels[0] <- struct{}{} + // Allow internal backfill processing to resume + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do backfill next + proceedBeforeBackfill <- errors.Newf("Bogus error for drop column transaction") + // Rollback attempt after failure + proceedBeforeBackfill <- nil + + schemaChangeWaitGroup.Wait() + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + // Test 3: with concurrent drop and mutations where an insert will + // cause the backfill operation to fail. + t.Run("concurrent-drop-mutations-insert-fail", func(t *testing.T) { + jobControlMu.Lock() + delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", + "ALTER TABLE defaultdb.public.t DROP COLUMN j"} + delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} + jobControlMu.Unlock() + + conn2.Exec(t, ` + DROP TABLE t; + CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); + INSERT INTO t VALUES (1, 1); + `) + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn2.DB.ExecContext(context.Background(), + ` + BEGIN; + ALTER TABLE t ALTER COLUMN j SET NOT NULL; + ALTER TABLE t ADD COLUMN k INT8 DEFAULT 42 NOT NULL; + COMMIT; + `) + // Two possibilities exist based on timing, either the current transaction + // will fail during backfill or the dependent one with the drop will fail. + failureError := "pq: transaction committed but schema change aborted with error: (23505): A dependent transaction failed for this schema change: failed to ingest index entries during backfill: duplicate key value violates unique constraint \"t_o_key\"" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + schemaChangeWaitGroup.Add(1) + go func() { + defer schemaChangeWaitGroup.Done() + _, err := conn1.DB.ExecContext(context.Background(), + ` + SET sql_safe_updates = false; + BEGIN; + ALTER TABLE t DROP COLUMN j; + ALTER TABLE t ALTER COLUMN k SET NOT NULL; + ALTER TABLE t ALTER COLUMN k SET DEFAULT 421; + ALTER TABLE t ADD COLUMN o INT8 DEFAULT 42 NOT NULL UNIQUE; + INSERT INTO t VALUES (2); + COMMIT; + `) + failureError := "pq: transaction committed but schema change aborted with error: (23505): failed to ingest index entries during backfill: duplicate key value violates unique constraint \"t_o_key\"" + if err != nil && + !strings.Contains(err.Error(), failureError) { + t.Error(err.Error()) + } else if err == nil { + t.Error("Expected error was not hit") + } + }() + <-delayNotify + + // Allow jobs to proceed once both are concurrent. + // Allow the first operation relying on + // the dropped column to resume. + delayJobChannels[0] <- struct{}{} + // Allow internal backfill processing to resume + proceedBeforeBackfill <- nil + // Allow the second job to proceed + delayJobChannels[1] <- struct{}{} + // Second job will also do backfill next + proceedBeforeBackfill <- nil + schemaChangeWaitGroup.Wait() + + close(delayJobChannels[0]) + close(delayJobChannels[1]) + }) + + close(delayNotify) + close(proceedBeforeBackfill) +}