Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61961: catalog: add namespace validation r=postamar a=postamar

Previously, the contents of the namespace table were only ever validated
by the doctor tool. This commit adds namespace validation checks into the
existing descriptor validation framework in the catalog package. These
checks are now run as part of the pre-txn-commit descriptor validation
suite.

Fixes cockroachdb#61010.

Release note (cli change): changed formatting of namespace validation
failures in doctor.

Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Mar 19, 2021
2 parents f810c6c + 7b4a481 commit 7f7d4fd
Show file tree
Hide file tree
Showing 21 changed files with 519 additions and 300 deletions.
108 changes: 54 additions & 54 deletions pkg/bench/ddl_analysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,73 @@ exp,benchmark
7,AlterRole/alter_role_with_1_option
8,AlterRole/alter_role_with_2_options
10,AlterRole/alter_role_with_3_options
19,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
19,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
19,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
19,AlterTableAddColumn/alter_table_add_1_column
19,AlterTableAddColumn/alter_table_add_2_columns
19,AlterTableAddColumn/alter_table_add_3_columns
29,AlterTableAddForeignKey/alter_table_add_1_foreign_key
39,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
49,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
29,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
30,AlterTableConfigureZone/alter_table_configure_zone_5_replicas
30,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_
30,AlterTableConfigureZone/alter_table_configure_zone_ranges
23,AlterTableDropColumn/alter_table_drop_1_column
27,AlterTableDropColumn/alter_table_drop_2_columns
31,AlterTableDropColumn/alter_table_drop_3_columns
20,AlterTableDropConstraint/alter_table_drop_1_check_constraint
21,AlterTableDropConstraint/alter_table_drop_2_check_constraints
22,AlterTableDropConstraint/alter_table_drop_3_check_constraints
13-14,AlterTableSplit/alter_table_split_at_1_value
19-20,AlterTableSplit/alter_table_split_at_2_values
25-26,AlterTableSplit/alter_table_split_at_3_values
9,AlterTableUnsplit/alter_table_unsplit_at_1_value
11,AlterTableUnsplit/alter_table_unsplit_at_2_values
13,AlterTableUnsplit/alter_table_unsplit_at_3_values
21,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
21,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
21,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
21,AlterTableAddColumn/alter_table_add_1_column
21,AlterTableAddColumn/alter_table_add_2_columns
21,AlterTableAddColumn/alter_table_add_3_columns
31,AlterTableAddForeignKey/alter_table_add_1_foreign_key
41,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
51,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
31,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
31,AlterTableConfigureZone/alter_table_configure_zone_5_replicas
31,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_
31,AlterTableConfigureZone/alter_table_configure_zone_ranges
25,AlterTableDropColumn/alter_table_drop_1_column
29,AlterTableDropColumn/alter_table_drop_2_columns
33,AlterTableDropColumn/alter_table_drop_3_columns
22,AlterTableDropConstraint/alter_table_drop_1_check_constraint
23,AlterTableDropConstraint/alter_table_drop_2_check_constraints
24,AlterTableDropConstraint/alter_table_drop_3_check_constraints
14-15,AlterTableSplit/alter_table_split_at_1_value
20-21,AlterTableSplit/alter_table_split_at_2_values
26-27,AlterTableSplit/alter_table_split_at_3_values
10,AlterTableUnsplit/alter_table_unsplit_at_1_value
12,AlterTableUnsplit/alter_table_unsplit_at_2_values
14,AlterTableUnsplit/alter_table_unsplit_at_3_values
7,CreateRole/create_role_with_1_option
9,CreateRole/create_role_with_2_options
10,CreateRole/create_role_with_3_options
8,CreateRole/create_role_with_no_options
21,DropDatabase/drop_database_0_tables
30,DropDatabase/drop_database_1_table
39,DropDatabase/drop_database_2_tables
48,DropDatabase/drop_database_3_tables
23,DropDatabase/drop_database_0_tables
32,DropDatabase/drop_database_1_table
41,DropDatabase/drop_database_2_tables
50,DropDatabase/drop_database_3_tables
19,DropRole/drop_1_role
29,DropRole/drop_2_roles
39,DropRole/drop_3_roles
21,DropSequence/drop_1_sequence
35,DropSequence/drop_2_sequences
49,DropSequence/drop_3_sequences
24,DropTable/drop_1_table
41,DropTable/drop_2_tables
58,DropTable/drop_3_tables
27,DropView/drop_1_view
56,DropView/drop_2_views
85,DropView/drop_3_views
22,Grant/grant_all_on_1_table
31,Grant/grant_all_on_2_tables
40,Grant/grant_all_on_3_tables
19,GrantRole/grant_1_role
22,GrantRole/grant_2_roles
23,DropSequence/drop_1_sequence
37,DropSequence/drop_2_sequences
51,DropSequence/drop_3_sequences
26,DropTable/drop_1_table
43,DropTable/drop_2_tables
60,DropTable/drop_3_tables
29,DropView/drop_1_view
58,DropView/drop_2_views
87,DropView/drop_3_views
24,Grant/grant_all_on_1_table
33,Grant/grant_all_on_2_tables
42,Grant/grant_all_on_3_tables
21,GrantRole/grant_1_role
24,GrantRole/grant_2_roles
5,ORMQueries/activerecord_type_introspection_query
2,ORMQueries/django_table_introspection_1_table
2,ORMQueries/django_table_introspection_4_tables
2,ORMQueries/django_table_introspection_8_tables
22,Revoke/revoke_all_on_1_table
31,Revoke/revoke_all_on_2_tables
40,Revoke/revoke_all_on_3_tables
18,RevokeRole/revoke_1_role
20,RevokeRole/revoke_2_roles
24,Revoke/revoke_all_on_1_table
33,Revoke/revoke_all_on_2_tables
42,Revoke/revoke_all_on_3_tables
20,RevokeRole/revoke_1_role
22,RevokeRole/revoke_2_roles
1,SystemDatabaseQueries/select_system.users_with_empty_database_name
1,SystemDatabaseQueries/select_system.users_with_schema_name
2,SystemDatabaseQueries/select_system.users_without_schema_name
28,Truncate/truncate_1_column_0_rows
28,Truncate/truncate_1_column_1_row
28,Truncate/truncate_1_column_2_rows
28,Truncate/truncate_2_column_0_rows
28,Truncate/truncate_2_column_1_rows
28,Truncate/truncate_2_column_2_rows
30,Truncate/truncate_1_column_0_rows
30,Truncate/truncate_1_column_1_row
30,Truncate/truncate_1_column_2_rows
30,Truncate/truncate_2_column_0_rows
30,Truncate/truncate_2_column_1_rows
30,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
23 changes: 18 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1980,17 +1980,30 @@ func (r *restoreResumer) dropDescriptors(
// Delete the database descriptors.
deletedDBs := make(map[descpb.ID]struct{})
for _, dbDesc := range details.DatabaseDescs {
db := dbdesc.NewBuilder(dbDesc).BuildExistingMutable()

// We need to ignore descriptors we just added since we haven't committed the txn that deletes these.
isDBEmpty, err := isDatabaseEmpty(ctx, txn, db.GetID(), allDescs, ignoredChildDescIDs)
isDBEmpty, err := isDatabaseEmpty(ctx, txn, dbDesc.GetID(), allDescs, ignoredChildDescIDs)
if err != nil {
return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", db.GetName())
return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", dbDesc.GetName())
}

if !isDBEmpty {
log.Warningf(ctx, "preserving database %s on restore failure because it contains new child objects or schemas", db.GetName())
log.Warningf(ctx, "preserving database %s on restore failure because it contains new child objects or schemas", dbDesc.GetName())
continue
}

db, err := descsCol.GetMutableDescriptorByID(ctx, dbDesc.GetID(), txn)
if err != nil {
return err
}

// Mark db as dropped and add uncommitted version to pass pre-txn
// descriptor validation.
db.SetDropped()
db.MaybeIncrementVersion()
if err := descsCol.AddUncommittedDescriptor(db); err != nil {
return err
}

descKey := catalogkeys.MakeDescMetadataKey(codec, db.GetID())
b.Del(descKey)
b.Del(catalogkeys.NewDatabaseKey(db.GetName()).Key(codec))
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,14 @@ func resolveTargetDB(
func maybeUpgradeTableDescsInBackupManifests(
ctx context.Context, backupManifests []BackupManifest, skipFKsWithNoMatchingTable bool,
) error {
descGetter := catalog.MapDescGetter{}
descGetter := catalog.MakeMapDescGetter()

// Populate the descGetter with all table descriptors in all backup
// descriptors so that they can be looked up.
for _, backupManifest := range backupManifests {
for _, desc := range backupManifest.Descriptors {
if table, _, _, _ := descpb.FromDescriptor(&desc); table != nil {
descGetter[table.ID] = tabledesc.NewBuilder(table).BuildImmutable()
descGetter.Descriptors[table.ID] = tabledesc.NewBuilder(table).BuildImmutable()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/testcluster
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ doctor cluster
----
debug doctor cluster
Examining 35 descriptors and 36 namespace entries...
ParentID 50, ParentSchemaID 29: relation "foo" (53): not being dropped but no namespace entry found
ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none
Examining 1 running jobs...
ERROR: validation failed
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/testzipdir
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Examining 38 descriptors and 43 namespace entries...
ParentID 52, ParentSchemaID 29: relation "vehicle_location_histories" (56): referenced database ID 52: descriptor not found
ParentID 52, ParentSchemaID 29: relation "promo_codes" (57): referenced database ID 52: descriptor not found
ParentID 52, ParentSchemaID 29: relation "user_promo_codes" (58): referenced database ID 52: descriptor not found
Descriptor 52: has namespace row(s) [{ParentID:0 ParentSchemaID:0 Name:movr}] but no descriptor
ParentID 0, ParentSchemaID 0: namespace entry "movr" (52): descriptor not found
Examining 1 running jobs...
job 587337426984566785: schema change GC refers to missing table descriptor(s) [59]
existing descriptors that still need to be dropped []
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/catalog/catalogkv/catalogkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,11 @@ func GetAllDescriptors(
if err != nil {
return nil, err
}
dg := make(catalog.MapDescGetter, len(descs))
dg := catalog.MapDescGetter{
Descriptors: make(map[descpb.ID]catalog.Descriptor, len(descs)),
}
for _, desc := range descs {
dg[desc.GetID()] = desc
dg.Descriptors[desc.GetID()] = desc
}
if err := catalog.ValidateSelfAndCrossReferences(ctx, dg, descs...); err != nil {
return nil, err
Expand Down Expand Up @@ -414,7 +416,7 @@ func getDescriptorByID(
return nil, err
}
dg := NewOneLevelUncachedDescGetter(txn, codec)
const level = catalog.ValidationLevelSelfAndCrossReferences
const level = catalog.ValidationLevelCrossReferences
return descriptorFromKeyValue(ctx, codec, r, mutable, expectedType, required, dg, level)
}

Expand Down Expand Up @@ -570,7 +572,7 @@ func getDescriptorsFromIDs(
catalog.Any,
bestEffort,
dg,
catalog.ValidationLevelSelfAndCrossReferences,
catalog.ValidationLevelCrossReferences,
)
if err != nil {
return nil, err
Expand Down
105 changes: 102 additions & 3 deletions pkg/sql/catalog/catalogkv/desc_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@ func (t *oneLevelUncachedDescGetter) fromKeyValue(
)
}

// GetDesc implements the catalog.DescGetter interface.
// Delegates to GetDescs.
func (t *oneLevelUncachedDescGetter) GetDesc(
ctx context.Context, id descpb.ID,
) (catalog.Descriptor, error) {
descKey := catalogkeys.MakeDescMetadataKey(t.codec, id)
kv, err := t.txn.Get(ctx, descKey)
res, err := t.GetDescs(ctx, []descpb.ID{id})
if err != nil {
return nil, err
}
return t.fromKeyValue(ctx, kv)
return res[0], nil
}

// GetDescs implements the catalog.BatchDescGetter interface.
func (t *oneLevelUncachedDescGetter) GetDescs(
ctx context.Context, reqs []descpb.ID,
) ([]catalog.Descriptor, error) {
Expand All @@ -82,10 +84,107 @@ func (t *oneLevelUncachedDescGetter) GetDescs(
}
ret := make([]catalog.Descriptor, len(reqs))
for i, res := range ba.Results {
if res.Err != nil {
return nil, res.Err
}
ret[i], err = t.fromKeyValue(ctx, res.Rows[0])
if err != nil {
return nil, err
}
}
return ret, nil
}

// GetNamespaceEntry implements the catalog.DescGetter interface.
// Delegates to GetNamespaceEntries.
func (t *oneLevelUncachedDescGetter) GetNamespaceEntry(
ctx context.Context, parentID, parentSchemaID descpb.ID, name string,
) (descpb.ID, error) {
res, err := t.GetNamespaceEntries(ctx, []descpb.NameInfo{{
ParentID: parentID,
ParentSchemaID: parentSchemaID,
Name: name,
}})
if err != nil {
return descpb.InvalidID, err
}
return res[0], nil
}

// GetNamespaceEntries implements the catalog.BatchDescGetter interface.
// This looks up the new system.namespace table for records matching the
// requests. If a request has no matching entry, then this falls back to the old
// system.namespace table.
func (t *oneLevelUncachedDescGetter) GetNamespaceEntries(
ctx context.Context, requests []descpb.NameInfo,
) ([]descpb.ID, error) {
// Build batch for looking up new system.namespace table.
b := t.txn.NewBatch()
isBatchEmpty := true
for _, r := range requests {
if r.Name == "" {
// Ignore nameless requests.
continue
}
key := catalogkeys.MakeNameMetadataKey(t.codec, r.ParentID, r.ParentSchemaID, r.Name)
b.Get(key)
isBatchEmpty = false
}
ret := make([]descpb.ID, len(requests))
if isBatchEmpty {
// If all requests are nameless, return early.
return ret, nil
}
err := t.txn.Run(ctx, b)
if err != nil {
return nil, err
}
var indexesToFallBack []int
j := 0
for i, r := range requests {
if r.Name == "" {
// Nameless requests are not present in batch, skip.
continue
}
result := b.Results[j]
j++
if result.Err != nil {
return nil, result.Err
}
if result.Rows[0].Exists() {
ret[i] = descpb.ID(result.Rows[0].ValueInt())
} else if r.ParentSchemaID == keys.PublicSchemaID || r.ParentSchemaID == keys.RootNamespaceID {
// We'll retry this request in the old namespace table, but don't bother
// if it's for a user-defined schema as those won't supported back then.
if indexesToFallBack == nil {
// Allocate only once.
indexesToFallBack = make([]int, 0, len(requests))
}
indexesToFallBack = append(indexesToFallBack, i)
}
}
if indexesToFallBack == nil {
// Happy path return.
return ret, nil
}
// Fall back to old namespace table.
b = t.txn.NewBatch()
for _, i := range indexesToFallBack {
r := requests[i]
key := catalogkeys.MakeDeprecatedNameMetadataKey(t.codec, r.ParentID, r.Name)
b.Get(key)
}
err = t.txn.Run(ctx, b)
if err != nil {
return nil, err
}
for j, result := range b.Results {
if result.Err != nil {
return nil, result.Err
}
if result.Rows[0].Exists() {
ret[indexesToFallBack[j]] = descpb.ID(result.Rows[0].ValueInt())
}
}
return ret, nil
}
6 changes: 6 additions & 0 deletions pkg/sql/catalog/catalogkv/unwrap_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func (o oneLevelMapDescGetter) GetDesc(
)
}

func (o oneLevelMapDescGetter) GetNamespaceEntry(
_ context.Context, _, _ descpb.ID, _ string,
) (descpb.ID, error) {
panic("not implemented")
}

func decodeDescriptorDSV(t *testing.T, descriptorCSVPath string) oneLevelMapDescGetter {
f, err := os.Open(descriptorCSVPath)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/catalog/dbdesc/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,18 @@ func TestValidateCrossDatabaseReferences(t *testing.T) {

for i, test := range tests {
privilege := descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName())
descs := catalog.MapDescGetter{}
descs := catalog.MakeMapDescGetter()
test.desc.Privileges = privilege
desc := NewBuilder(&test.desc).BuildImmutable()
descs[test.desc.ID] = desc
descs.Descriptors[test.desc.ID] = desc
test.multiRegionEnum.Privileges = privilege
descs[test.multiRegionEnum.ID] = typedesc.NewBuilder(&test.multiRegionEnum).BuildImmutable()
descs.Descriptors[test.multiRegionEnum.ID] = typedesc.NewBuilder(&test.multiRegionEnum).BuildImmutable()
for _, schemaDesc := range test.schemaDescs {
schemaDesc.Privileges = privilege
descs[schemaDesc.ID] = schemadesc.NewBuilder(&schemaDesc).BuildImmutable()
descs.Descriptors[schemaDesc.ID] = schemadesc.NewBuilder(&schemaDesc).BuildImmutable()
}
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), test.err)
const validateCrossReferencesOnly = catalog.ValidationLevelSelfAndCrossReferences &^ (catalog.ValidationLevelSelfAndCrossReferences >> 1)
const validateCrossReferencesOnly = catalog.ValidationLevelCrossReferences &^ (catalog.ValidationLevelCrossReferences >> 1)
if err := catalog.Validate(ctx, descs, validateCrossReferencesOnly, desc).CombinedError(); err == nil {
if test.err != "" {
t.Errorf("%d: expected \"%s\", but found success: %+v", i, expectedErr, test.desc)
Expand Down
Loading

0 comments on commit 7f7d4fd

Please sign in to comment.