Skip to content

Commit

Permalink
sql: incorrect behaviour setting NOT NULL on column and dropping it
Browse files Browse the repository at this point in the history
Fixes: #47719

Previously, if a column was mutated and then dropped in a single
transaction we would still try to validate the dropped column.
Additionally, in general if a drop operation occurred in a
different transaction we ran the danger of doing something
similar. From a behaviour perspective this was bad, since
it can lead to unexpected behaviour for users. To address, this
patch introduces logic to scan later mutations to check if
a drop column occurs. If one does occur then it will skip any
operations made irrelevant by the drop. Additionally, for a
correctness perspective it will wait for the drop to complete
before returning back.

Release note (bug fix): A constraint like not null or check on
a column made irrelevant by a drop in a later concurrent transaction
would lead to incorrect errors / behaviour.
  • Loading branch information
fqazi committed Mar 18, 2021
1 parent 98ae002 commit b69d96c
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 10 deletions.
29 changes: 22 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,14 @@ func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
// WaitForJobs waits for a given list of jobs to reach some sort
// of terminal state.
func (r *Registry) WaitForJobs(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
buf := bytes.Buffer{}
for i, id := range jobs {
if i > 0 {
Expand Down Expand Up @@ -372,6 +368,25 @@ func (r *Registry) Run(
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
err := r.WaitForJobs(ctx, ex, jobs)
if err != nil {
return err
}
return nil
}

// NewJob creates a new Job.
func (r *Registry) NewJob(record Record, jobID jobspb.JobID) *Job {
job := &Job{
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,16 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error {
log.Infof(ctx, "running backfill for %q, v=%d", tableDesc.Name, tableDesc.Version)

needColumnBackfill := false
for _, m := range tableDesc.Mutations {
for mutationIdx, m := range tableDesc.Mutations {
if m.MutationID != sc.mutationID {
break
}
// If the current mutation is discarded, then
// skip over processing.
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, mutationIdx+1); discarded {
continue
}

switch m.Direction {
case descpb.DescriptorMutation_ADD:
switch t := m.Descriptor_.(type) {
Expand Down Expand Up @@ -1865,6 +1871,11 @@ func runSchemaChangesInTxn(
// mutations that need to be processed.
for i := 0; i < len(tableDesc.Mutations); i++ {
m := tableDesc.Mutations[i]
// Skip mutations that get cancelled by later operations
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, i+1); discarded {
continue
}

immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable()
switch m.Direction {
case descpb.DescriptorMutation_ADD:
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,26 @@ SELECT count(descriptor_id)
----
0

# Validation for #47719 where a mutation is cancelled out by a drop.
statement ok
create table tColDrop (a int);

statement ok
begin;
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

statement ok
drop table tColDrop;

statement ok
begin;
create table tColDrop (a int);
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
Expand Down
90 changes: 88 additions & 2 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,13 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// Jobs (for GC, etc.) that need to be started immediately after the table
// descriptor updates are published.
var didUpdate bool
var depMutationJobs []jobspb.JobID
var scTable *tabledesc.Mutable
modified, err := sc.txnWithModified(ctx, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn)
var err error
scTable, err = descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn)
if err != nil {
return err
}
Expand Down Expand Up @@ -1292,9 +1295,17 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// The table descriptor is unchanged, return without writing anything.
return nil
}
committedMutations := scTable.Mutations[:i]
// Trim the executed mutations from the descriptor.
scTable.Mutations = scTable.Mutations[i:]

// Check any jobs that we need to depend on for the current
// job to be successful.
depMutationJobs, err = sc.getDependentMutationsJobs(ctx, scTable, committedMutations)
if err != nil {
return err
}

for i, g := range scTable.MutationJobs {
if g.MutationID == sc.mutationID {
// Trim the executed mutation group from the descriptor.
Expand Down Expand Up @@ -1385,6 +1396,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}

// If any operations was skipped because a mutation was made
// redundant due to a column getting dropped later on then we should
// wait for those jobs to complete before returning our result back.
if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil {
return errors.Wrap(err, "A dependent transaction failed for this schema change")
}

return nil
}

Expand Down Expand Up @@ -1525,6 +1544,7 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError
// Mutation is already reversed, so we don't need to do any more work.
// This can happen if the mutations were already reversed, but before
// the rollback completed the job was adopted.
// the rollback completed the job was adopted.
if alreadyReversed {
return nil
}
Expand Down Expand Up @@ -1564,6 +1584,12 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError
return errors.AssertionFailedf("mutation already rolled back: %v", mutation)
}

// Ignore mutations that would be skipped, nothing
// to reverse here.
if discarded, _ := isCurrentMutationDiscarded(scTable, mutation, i+1); discarded {
continue
}

log.Warningf(ctx, "reverse schema change mutation: %+v", mutation)
scTable.Mutations[i], columns = sc.reverseMutation(mutation, false /*notStarted*/, columns)

Expand Down Expand Up @@ -2094,7 +2120,6 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
return err
}
}

execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error {
sc := SchemaChanger{
descID: descID,
Expand Down Expand Up @@ -2519,3 +2544,64 @@ func DeleteTableDescAndZoneConfig(
return txn.Run(ctx, b)
})
}

// getDependentMutationJobs gets the dependent jobs that need to complete for
// the current schema change to be considered successful. For example, if column
// A will have a unique index, but it later gets dropped. Then for this mutation
// to be successful the drop column job has to be successful too.
func (sc *SchemaChanger) getDependentMutationsJobs(
ctx context.Context, tableDesc *tabledesc.Mutable, mutations []descpb.DescriptorMutation,
) ([]jobspb.JobID, error) {
dependentJobs := make([]jobspb.JobID, 0, len(tableDesc.MutationJobs))
for _, m := range mutations {
// Find all the mutations that we depend
discarded, dependentID := isCurrentMutationDiscarded(tableDesc, m, 0)
if discarded {
jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, dependentID)
if err != nil {
return nil, err
}
dependentJobs = append(dependentJobs, jobID)
}
}
return dependentJobs, nil
}

// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant
// by a later operation. The nextMutationIdx provides the index at which to check for
// later mutation.
func isCurrentMutationDiscarded(
tableDesc *tabledesc.Mutable, currentMutation descpb.DescriptorMutation, nextMutationIdx int,
) (bool, descpb.MutationID) {
if nextMutationIdx+1 > len(tableDesc.Mutations) {
return false, descpb.InvalidMutationID
}

colToCheck := make([]descpb.ColumnID, 0, 1)
// Both NOT NULL related updates and check constraint updates
// involving this column will get cancelled out by a drop column.
if constraint := currentMutation.GetConstraint(); constraint != nil {
if constraint.ConstraintType == descpb.ConstraintToUpdate_NOT_NULL {
colToCheck = append(colToCheck, constraint.NotNullColumn)
} else if constraint.ConstraintType == descpb.ConstraintToUpdate_CHECK {
colToCheck = constraint.Check.ColumnIDs
}
}

for _, m := range tableDesc.Mutations[nextMutationIdx:] {
colDesc := m.GetColumn()
if m.Direction == descpb.DescriptorMutation_DROP &&
colDesc != nil &&
!m.Rollback {
// Column was dropped later on, so this operation
// should be a no-op.
for _, col := range colToCheck {
if colDesc.ID == col {
return true, m.MutationID
}
}
}
}
// Not discarded by any later operation.
return false, descpb.InvalidMutationID
}
Loading

0 comments on commit b69d96c

Please sign in to comment.