Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: incorrect behaviour setting NOT NULL on column and dropping it #62167

Merged
merged 1 commit into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,14 @@ func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
// WaitForJobs waits for a given list of jobs to reach some sort
// of terminal state.
func (r *Registry) WaitForJobs(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
buf := bytes.Buffer{}
for i, id := range jobs {
if i > 0 {
Expand Down Expand Up @@ -372,6 +368,25 @@ func (r *Registry) Run(
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
err := r.WaitForJobs(ctx, ex, jobs)
if err != nil {
return err
}
return nil
}

// NewJob creates a new Job.
func (r *Registry) NewJob(record Record, jobID jobspb.JobID) *Job {
job := &Job{
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,16 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error {
log.Infof(ctx, "running backfill for %q, v=%d", tableDesc.Name, tableDesc.Version)

needColumnBackfill := false
for _, m := range tableDesc.Mutations {
for mutationIdx, m := range tableDesc.Mutations {
if m.MutationID != sc.mutationID {
break
}
// If the current mutation is discarded, then
// skip over processing.
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, mutationIdx+1); discarded {
continue
}

switch m.Direction {
case descpb.DescriptorMutation_ADD:
switch t := m.Descriptor_.(type) {
Expand Down Expand Up @@ -1865,6 +1871,11 @@ func runSchemaChangesInTxn(
// mutations that need to be processed.
for i := 0; i < len(tableDesc.Mutations); i++ {
m := tableDesc.Mutations[i]
// Skip mutations that get canceled by later operations
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, i+1); discarded {
continue
}

immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable()
switch m.Direction {
case descpb.DescriptorMutation_ADD:
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,26 @@ SELECT count(descriptor_id)
----
0

# Validation for #47719 where a mutation is canceled out by a drop.
statement ok
create table tColDrop (a int);

statement ok
begin;
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

statement ok
drop table tColDrop;

statement ok
begin;
create table tColDrop (a int);
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
Expand Down
86 changes: 85 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,11 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// Jobs (for GC, etc.) that need to be started immediately after the table
// descriptor updates are published.
var didUpdate bool
var depMutationJobs []jobspb.JobID
modified, err := sc.txnWithModified(ctx, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
var err error
scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn)
if err != nil {
return err
Expand Down Expand Up @@ -1292,9 +1294,17 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// The table descriptor is unchanged, return without writing anything.
return nil
}
committedMutations := scTable.Mutations[:i]
// Trim the executed mutations from the descriptor.
scTable.Mutations = scTable.Mutations[i:]

// Check any jobs that we need to depend on for the current
// job to be successful.
depMutationJobs, err = sc.getDependentMutationsJobs(ctx, scTable, committedMutations)
if err != nil {
return err
}

for i, g := range scTable.MutationJobs {
if g.MutationID == sc.mutationID {
// Trim the executed mutation group from the descriptor.
Expand Down Expand Up @@ -1385,6 +1395,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}

// If any operations was skipped because a mutation was made
// redundant due to a column getting dropped later on then we should
// wait for those jobs to complete before returning our result back.
if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil {
return errors.Wrap(err, "A dependent transaction failed for this schema change")
}

return nil
}

Expand Down Expand Up @@ -1564,6 +1582,12 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError
return errors.AssertionFailedf("mutation already rolled back: %v", mutation)
}

// Ignore mutations that would be skipped, nothing
// to reverse here.
if discarded, _ := isCurrentMutationDiscarded(scTable, mutation, i+1); discarded {
continue
}

log.Warningf(ctx, "reverse schema change mutation: %+v", mutation)
scTable.Mutations[i], columns = sc.reverseMutation(mutation, false /*notStarted*/, columns)

Expand Down Expand Up @@ -2094,7 +2118,6 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
return err
}
}

execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error {
sc := SchemaChanger{
descID: descID,
Expand Down Expand Up @@ -2519,3 +2542,64 @@ func DeleteTableDescAndZoneConfig(
return txn.Run(ctx, b)
})
}

// getDependentMutationJobs gets the dependent jobs that need to complete for
// the current schema change to be considered successful. For example, if column
// A will have a unique index, but it later gets dropped. Then for this mutation
// to be successful the drop column job has to be successful too.
func (sc *SchemaChanger) getDependentMutationsJobs(
ctx context.Context, tableDesc *tabledesc.Mutable, mutations []descpb.DescriptorMutation,
) ([]jobspb.JobID, error) {
dependentJobs := make([]jobspb.JobID, 0, len(tableDesc.MutationJobs))
for _, m := range mutations {
// Find all the mutations that we depend
discarded, dependentID := isCurrentMutationDiscarded(tableDesc, m, 0)
if discarded {
jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, dependentID)
if err != nil {
return nil, err
}
dependentJobs = append(dependentJobs, jobID)
}
}
return dependentJobs, nil
}

// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant
// by a later operation. The nextMutationIdx provides the index at which to check for
// later mutation.
func isCurrentMutationDiscarded(
tableDesc *tabledesc.Mutable, currentMutation descpb.DescriptorMutation, nextMutationIdx int,
) (bool, descpb.MutationID) {
if nextMutationIdx+1 > len(tableDesc.Mutations) {
return false, descpb.InvalidMutationID
}

colToCheck := make([]descpb.ColumnID, 0, 1)
// Both NOT NULL related updates and check constraint updates
// involving this column will get canceled out by a drop column.
if constraint := currentMutation.GetConstraint(); constraint != nil {
if constraint.ConstraintType == descpb.ConstraintToUpdate_NOT_NULL {
colToCheck = append(colToCheck, constraint.NotNullColumn)
} else if constraint.ConstraintType == descpb.ConstraintToUpdate_CHECK {
colToCheck = constraint.Check.ColumnIDs
}
}

for _, m := range tableDesc.Mutations[nextMutationIdx:] {
colDesc := m.GetColumn()
if m.Direction == descpb.DescriptorMutation_DROP &&
colDesc != nil &&
!m.Rollback {
// Column was dropped later on, so this operation
// should be a no-op.
for _, col := range colToCheck {
if colDesc.ID == col {
return true, m.MutationID
}
}
}
}
// Not discarded by any later operation.
return false, descpb.InvalidMutationID
}
Loading