Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
62076: sql: add interleaved_indexes/interleaved_tables into crdb_internal r=ajwerner a=fqazi

Previous, when we added the crdb_internal.interleaved table that
showed all interleaved indexes, however it was impossible tell
which table the primary key of the parent table was interleaved.
This was inadequate because users could not tell what the parent
table was. To address this, this patch removes
crdb_internal.interleaved and converts it into two virtual tables
interlaved_indexes/interleaved_tables where the one is for
non-primary key indexes and the second is for tables interleaved
on the primary key.

Release note (sql change): Renamed crdb_internal.interleaved to
crdb_internal.interlaved_indexes and added crdb_internal.interlaved_table
for viewing interleaved tables on the primary key.

62167: sql: incorrect behaviour setting NOT NULL on column and dropping it r=ajwerner a=fqazi

Fixes: cockroachdb#47719

Previously, if a column was mutated and then dropped in a single
transaction we would still try to validate the dropped column.
Additionally, in general if a drop operation occurred in a
different transaction we ran the danger of doing something
similar. From a behaviour perspective this was bad, since
it can lead to unexpected behaviour for users. To address, this
patch introduces logic to scan later mutations to check if
a drop column occurs. If one does occur then it will skip any
operations made irrelevant by the drop. Additionally, for a
correctness perspective it will wait for the drop to complete
before returning back.

Release note (bug fix): A constraint like not null or check on
a column made irrelevant by a drop in a later concurrent transaction
would lead to incorrect errors / behaviour.

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Mar 19, 2021
3 parents dc6af91 + 011bbfb + a9aa92c commit d80f6e4
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 36 deletions.
29 changes: 22 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,14 @@ func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
// WaitForJobs waits for a given list of jobs to reach some sort
// of terminal state.
func (r *Registry) WaitForJobs(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
buf := bytes.Buffer{}
for i, id := range jobs {
if i > 0 {
Expand Down Expand Up @@ -372,6 +368,25 @@ func (r *Registry) Run(
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID,
) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
err := r.WaitForJobs(ctx, ex, jobs)
if err != nil {
return err
}
return nil
}

// NewJob creates a new Job.
func (r *Registry) NewJob(record Record, jobID jobspb.JobID) *Job {
job := &Job{
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,16 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error {
log.Infof(ctx, "running backfill for %q, v=%d", tableDesc.Name, tableDesc.Version)

needColumnBackfill := false
for _, m := range tableDesc.Mutations {
for mutationIdx, m := range tableDesc.Mutations {
if m.MutationID != sc.mutationID {
break
}
// If the current mutation is discarded, then
// skip over processing.
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, mutationIdx+1); discarded {
continue
}

switch m.Direction {
case descpb.DescriptorMutation_ADD:
switch t := m.Descriptor_.(type) {
Expand Down Expand Up @@ -1865,6 +1871,11 @@ func runSchemaChangesInTxn(
// mutations that need to be processed.
for i := 0; i < len(tableDesc.Mutations); i++ {
m := tableDesc.Mutations[i]
// Skip mutations that get canceled by later operations
if discarded, _ := isCurrentMutationDiscarded(tableDesc, m, i+1); discarded {
continue
}

immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable()
switch m.Direction {
case descpb.DescriptorMutation_ADD:
Expand Down
21 changes: 11 additions & 10 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4148,7 +4148,11 @@ CREATE TABLE crdb_internal.interleaved (
STRING NOT NULL,
index_name
STRING NOT NULL,
parent_index
parent_database_name
STRING NOT NULL,
parent_schema_name
STRING NOT NULL,
parent_table_name
STRING NOT NULL
);`,
populate: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, addRow func(...tree.Datum) error) error {
Expand All @@ -4162,30 +4166,27 @@ CREATE TABLE crdb_internal.interleaved (
if index.NumInterleaveAncestors() == 0 {
continue
}

ancestor := index.GetInterleaveAncestor(index.NumInterleaveAncestors() - 1)
parentTable, err := lookupFn.getTableByID(ancestor.TableID)
if err != nil {
return err
}
parentIndex, err := parentTable.FindIndexWithID(ancestor.IndexID)
if err != nil {
return err
}
parentSchemaName, err := lookupFn.getSchemaNameByID(parentTable.GetParentSchemaID())
if err != nil {
return err
}
database, err := lookupFn.getDatabaseByID(parentTable.GetParentID())
parentDatabase, err := lookupFn.getDatabaseByID(parentTable.GetParentID())
if err != nil {
return err
}

if err := addRow(tree.NewDString(database.GetName()),
tree.NewDString(parentSchemaName),
if err := addRow(tree.NewDString(db.GetName()),
tree.NewDString(schemaName),
tree.NewDString(table.GetName()),
tree.NewDString(index.GetName()),
tree.NewDString(parentIndex.GetName())); err != nil {
tree.NewDString(parentDatabase.GetName()),
tree.NewDString(parentSchemaName),
tree.NewDString(parentTable.GetName())); err != nil {
return err
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,26 @@ SELECT count(descriptor_id)
----
0

# Validation for #47719 where a mutation is canceled out by a drop.
statement ok
create table tColDrop (a int);

statement ok
begin;
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

statement ok
drop table tColDrop;

statement ok
begin;
create table tColDrop (a int);
alter table tColDrop alter column a set not null;
alter table tColDrop drop column a;
commit;

# Validate that the schema_change_successful metric
query T
SELECT feature_name FROM crdb_internal.feature_usage
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/create_statements
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,17 @@ CREATE TABLE crdb_internal.interleaved (
schema_name STRING NOT NULL,
table_name STRING NOT NULL,
index_name STRING NOT NULL,
parent_index STRING NOT NULL
parent_database_name STRING NOT NULL,
parent_schema_name STRING NOT NULL,
parent_table_name STRING NOT NULL
) CREATE TABLE crdb_internal.interleaved (
database_name STRING NOT NULL,
schema_name STRING NOT NULL,
table_name STRING NOT NULL,
index_name STRING NOT NULL,
parent_index STRING NOT NULL
parent_database_name STRING NOT NULL,
parent_schema_name STRING NOT NULL,
parent_table_name STRING NOT NULL
) {} {}
CREATE TABLE crdb_internal.invalid_objects (
id INT8 NULL,
Expand Down
42 changes: 27 additions & 15 deletions pkg/sql/logictest/testdata/logic_test/interleaved
Original file line number Diff line number Diff line change
Expand Up @@ -497,22 +497,34 @@ CREATE INDEX NIINDX3 ON CHILD(z);
statement ok
CREATE INDEX IINDX ON CHILD(rowid,x,y,z) INTERLEAVE IN PARENT PARENT(rowid);

# Child in different schema then parent
statement ok
CREATE TABLE parentDS (k STRING PRIMARY KEY);

statement ok
CREATE SCHEMA child_schema;

statement ok
CREATE TABLE child_schema.child (ck INT8 PRIMARY KEY, k STRING NOT NULL UNIQUE);

statement ok
CREATE INDEX interl ON child_schema.child (k) INTERLEAVE IN PARENT parentDS (k);

query TTTTT
query TTTTTTT
select * from "".crdb_internal.interleaved;
----
test public p1_1 p1_id primary
test public all_interleaves primary primary
test public all_interleaves all_interleaves_c_d_idx primary
test public all_interleaves all_interleaves_d_c_key primary
test public orders primary primary
other public interdb primary primary
test public c20067 primary primary
test public documents primary primary
test public big_interleave_parent primary primary
test public big_interleave_child primary primary
test public interleave_create_notice primary primary
test public interleave_create_notice interleave_index primary
test public interleave_pk_notice primary primary
test public child iindx primary
test public p1_1 p1_id test public p1_1
test public all_interleaves primary test public p1_1
test public all_interleaves all_interleaves_c_d_idx test public p1_1
test public all_interleaves all_interleaves_d_c_key test public p1_1
test public orders primary test public customers
test public interdb primary other public foo
test public c20067 primary test public p20067
test public documents primary test public users
test public big_interleave_parent primary test public big_interleave_grandparent
test public big_interleave_child primary test public big_interleave_parent
test public interleave_create_notice primary test public interleave_parent
test public interleave_create_notice interleave_index test public interleave_parent
test public interleave_pk_notice primary test public interleave_parent
test public child iindx test public parent
test child_schema child interl test public parentds
86 changes: 85 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,9 +1046,11 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// Jobs (for GC, etc.) that need to be started immediately after the table
// descriptor updates are published.
var didUpdate bool
var depMutationJobs []jobspb.JobID
modified, err := sc.txnWithModified(ctx, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
var err error
scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn)
if err != nil {
return err
Expand Down Expand Up @@ -1294,9 +1296,17 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
// The table descriptor is unchanged, return without writing anything.
return nil
}
committedMutations := scTable.Mutations[:i]
// Trim the executed mutations from the descriptor.
scTable.Mutations = scTable.Mutations[i:]

// Check any jobs that we need to depend on for the current
// job to be successful.
depMutationJobs, err = sc.getDependentMutationsJobs(ctx, scTable, committedMutations)
if err != nil {
return err
}

for i, g := range scTable.MutationJobs {
if g.MutationID == sc.mutationID {
// Trim the executed mutation group from the descriptor.
Expand Down Expand Up @@ -1387,6 +1397,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}

// If any operations was skipped because a mutation was made
// redundant due to a column getting dropped later on then we should
// wait for those jobs to complete before returning our result back.
if err := sc.jobRegistry.WaitForJobs(ctx, sc.execCfg.InternalExecutor, depMutationJobs); err != nil {
return errors.Wrap(err, "A dependent transaction failed for this schema change")
}

return nil
}

Expand Down Expand Up @@ -1566,6 +1584,12 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError
return errors.AssertionFailedf("mutation already rolled back: %v", mutation)
}

// Ignore mutations that would be skipped, nothing
// to reverse here.
if discarded, _ := isCurrentMutationDiscarded(scTable, mutation, i+1); discarded {
continue
}

log.Warningf(ctx, "reverse schema change mutation: %+v", mutation)
scTable.Mutations[i], columns = sc.reverseMutation(mutation, false /*notStarted*/, columns)

Expand Down Expand Up @@ -2096,7 +2120,6 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
return err
}
}

execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error {
sc := SchemaChanger{
descID: descID,
Expand Down Expand Up @@ -2526,3 +2549,64 @@ func DeleteTableDescAndZoneConfig(
return txn.Run(ctx, b)
})
}

// getDependentMutationJobs gets the dependent jobs that need to complete for
// the current schema change to be considered successful. For example, if column
// A will have a unique index, but it later gets dropped. Then for this mutation
// to be successful the drop column job has to be successful too.
func (sc *SchemaChanger) getDependentMutationsJobs(
ctx context.Context, tableDesc *tabledesc.Mutable, mutations []descpb.DescriptorMutation,
) ([]jobspb.JobID, error) {
dependentJobs := make([]jobspb.JobID, 0, len(tableDesc.MutationJobs))
for _, m := range mutations {
// Find all the mutations that we depend
discarded, dependentID := isCurrentMutationDiscarded(tableDesc, m, 0)
if discarded {
jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, dependentID)
if err != nil {
return nil, err
}
dependentJobs = append(dependentJobs, jobID)
}
}
return dependentJobs, nil
}

// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant
// by a later operation. The nextMutationIdx provides the index at which to check for
// later mutation.
func isCurrentMutationDiscarded(
tableDesc *tabledesc.Mutable, currentMutation descpb.DescriptorMutation, nextMutationIdx int,
) (bool, descpb.MutationID) {
if nextMutationIdx+1 > len(tableDesc.Mutations) {
return false, descpb.InvalidMutationID
}

colToCheck := make([]descpb.ColumnID, 0, 1)
// Both NOT NULL related updates and check constraint updates
// involving this column will get canceled out by a drop column.
if constraint := currentMutation.GetConstraint(); constraint != nil {
if constraint.ConstraintType == descpb.ConstraintToUpdate_NOT_NULL {
colToCheck = append(colToCheck, constraint.NotNullColumn)
} else if constraint.ConstraintType == descpb.ConstraintToUpdate_CHECK {
colToCheck = constraint.Check.ColumnIDs
}
}

for _, m := range tableDesc.Mutations[nextMutationIdx:] {
colDesc := m.GetColumn()
if m.Direction == descpb.DescriptorMutation_DROP &&
colDesc != nil &&
!m.Rollback {
// Column was dropped later on, so this operation
// should be a no-op.
for _, col := range colToCheck {
if colDesc.ID == col {
return true, m.MutationID
}
}
}
}
// Not discarded by any later operation.
return false, descpb.InvalidMutationID
}
Loading

0 comments on commit d80f6e4

Please sign in to comment.