Skip to content

Commit

Permalink
Merge #72662
Browse files Browse the repository at this point in the history
72662: sql: public schema long running migration r=postamar a=RichardJCai

sql: public schema long running migration

Release note: None

Co-authored-by: richardjcai <[email protected]>
  • Loading branch information
craig[bot] and RichardJCai committed Jan 11, 2022
2 parents c02266f + f60150d commit f68a5a7
Show file tree
Hide file tree
Showing 5 changed files with 491 additions and 15 deletions.
11 changes: 11 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"ensure_no_draining_names.go",
"insert_missing_public_schema_namespace_entry.go",
"migrations.go",
"public_schema_migration.go",
"schema_changes.go",
"seed_tenant_span_configs.go",
],
Expand All @@ -23,11 +24,17 @@ go_library(
"//pkg/migration",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/schemadesc",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/util/log",
Expand All @@ -49,6 +56,7 @@ go_test(
"ensure_no_draining_names_external_test.go",
"helpers_test.go",
"main_test.go",
"public_schema_migration_external_test.go",
],
data = glob(["testdata/**"]),
embed = [":migrations"],
Expand All @@ -58,9 +66,11 @@ go_test(
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
Expand All @@ -73,6 +83,7 @@ go_test(
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
Expand Down
7 changes: 6 additions & 1 deletion pkg/migration/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var migrations = []migration.Migration{
alterSystemStmtDiagReqs,
),
migration.NewTenantMigration(
"seed system.span_configurations with configs for existing for existing tenants",
"seed system.span_configurations with configs for existing tenants",
toCV(clusterversion.SeedTenantSpanConfigs),
NoPrecondition,
seedTenantSpanConfigsMigration,
Expand All @@ -75,6 +75,11 @@ var migrations = []migration.Migration{
NoPrecondition,
alterTableProtectedTimestampRecords,
),
migration.NewTenantMigration("update synthetic public schemas to be backed by a descriptor",
toCV(clusterversion.PublicSchemasWithDescriptors),
NoPrecondition,
publicSchemaMigration,
),
}

func init() {
Expand Down
225 changes: 225 additions & 0 deletions pkg/migration/migrations/public_schema_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

func publicSchemaMigration(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job,
) error {
query := `
SELECT ns_db.id
FROM system.namespace AS ns_db
INNER JOIN system.namespace
AS ns_sc ON (
ns_db.id
= ns_sc."parentID"
)
WHERE ns_db.id != 1
AND ns_db."parentSchemaID" = 0
AND ns_db."parentID" = 0
AND ns_sc."parentSchemaID" = 0
AND ns_sc.name = 'public'
AND ns_sc.id = 29
ORDER BY ns_db.id ASC;
`
rows, err := d.InternalExecutor.QueryIterator(
ctx, "get_databases_with_synthetic_public_schemas", nil /* txn */, query,
)
if err != nil {
return err
}
var databaseIDs []descpb.ID
for ok, err := rows.Next(ctx); ok; ok, err = rows.Next(ctx) {
if err != nil {
return err
}
parentID := descpb.ID(tree.MustBeDInt(rows.Cur()[0]))
databaseIDs = append(databaseIDs, parentID)
}

for _, dbID := range databaseIDs {
if err := createPublicSchemaForDatabase(ctx, dbID, d); err != nil {
return err
}
}

return nil
}

func createPublicSchemaForDatabase(
ctx context.Context, dbID descpb.ID, d migration.TenantDeps,
) error {
return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB,
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
return createPublicSchemaDescriptor(ctx, txn, descriptors, dbID, d)
})
}

func createPublicSchemaDescriptor(
ctx context.Context,
txn *kv.Txn,
descriptors *descs.Collection,
dbID descpb.ID,
d migration.TenantDeps,
) error {
_, desc, err := descriptors.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{Required: true})
if err != nil {
return err
}
if desc.HasPublicSchemaWithDescriptor() {
// If the database already has a descriptor backed public schema,
// there is no work to be done.
return nil
}
dbDescBuilder := dbdesc.NewBuilder(desc.DatabaseDesc())
dbDesc := dbDescBuilder.BuildExistingMutableDatabase()

b := txn.NewBatch()

publicSchemaDesc, _, err := sql.CreateSchemaDescriptorWithPrivileges(
ctx, d.DB, d.Codec, desc, tree.PublicSchema, security.AdminRoleName(), security.AdminRoleName(), true, /* allocateID */
)
if err != nil {
return err
}
publicSchemaID := publicSchemaDesc.GetID()
newKey := catalogkeys.MakeSchemaNameKey(d.Codec, dbID, publicSchemaDesc.GetName())
oldKey := catalogkeys.EncodeNameKey(d.Codec, catalogkeys.NewNameKeyComponents(dbID, keys.RootNamespaceID, tree.PublicSchema))
// Remove namespace entry for old public schema.
b.Del(oldKey)
b.CPut(newKey, publicSchemaID, nil)
if err := catalogkv.WriteNewDescToBatch(
ctx,
false,
d.Settings,
b,
d.Codec,
publicSchemaID,
publicSchemaDesc,
); err != nil {
return err
}

if dbDesc.Schemas == nil {
dbDesc.Schemas = map[string]descpb.DatabaseDescriptor_SchemaInfo{
tree.PublicSchema: {
ID: publicSchemaID,
},
}
} else {
dbDesc.Schemas[tree.PublicSchema] = descpb.DatabaseDescriptor_SchemaInfo{
ID: publicSchemaID,
}
}
if err := descriptors.WriteDescToBatch(ctx, false, dbDesc, b); err != nil {
return err
}
allDescriptors, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
if err := migrateObjectsInDatabase(ctx, dbID, d, txn, publicSchemaID, descriptors, allDescriptors); err != nil {
return err
}

return txn.Run(ctx, b)
}

func migrateObjectsInDatabase(
ctx context.Context,
dbID descpb.ID,
d migration.TenantDeps,
txn *kv.Txn,
newPublicSchemaID descpb.ID,
descriptors *descs.Collection,
allDescriptors []catalog.Descriptor,
) error {
const minBatchSizeInBytes = 1 << 20 /* 512 KiB batch size */
currSize := 0
var modifiedDescs []catalog.MutableDescriptor
batch := txn.NewBatch()
for _, desc := range allDescriptors {
// Only update descriptors in the parent db and public schema.
if desc.Dropped() || desc.GetParentID() != dbID ||
(desc.GetParentSchemaID() != keys.PublicSchemaID && desc.GetParentSchemaID() != descpb.InvalidID) {
continue
}
b := desc.NewBuilder()
updateDesc := func(mut catalog.MutableDescriptor, newPublicSchemaID descpb.ID) {
oldKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), mut.GetParentSchemaID(), mut.GetName())
batch.Del(oldKey)
newKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), newPublicSchemaID, mut.GetName())
batch.Put(newKey, mut.GetID())
modifiedDescs = append(modifiedDescs, mut)
}
switch mut := b.BuildExistingMutable().(type) {
case *dbdesc.Mutable, *schemadesc.Mutable:
// Ignore database and schema descriptors.
case *tabledesc.Mutable:
updateDesc(mut, newPublicSchemaID)
mut.UnexposedParentSchemaID = newPublicSchemaID
currSize += mut.Size()
case *typedesc.Mutable:
updateDesc(mut, newPublicSchemaID)
mut.ParentSchemaID = newPublicSchemaID
currSize += mut.Size()
}

// Once we reach the minimum batch size, write the batch and create a new
// one.
if currSize >= minBatchSizeInBytes {
for _, modified := range modifiedDescs {
err := descriptors.WriteDescToBatch(
ctx, false, modified, batch,
)
if err != nil {
return err
}
}
if err := txn.Run(ctx, batch); err != nil {
return err
}
currSize = 0
batch = txn.NewBatch()
modifiedDescs = make([]catalog.MutableDescriptor, 0)
}
}
for _, modified := range modifiedDescs {
err := descriptors.WriteDescToBatch(
ctx, false, modified, batch,
)
if err != nil {
return err
}
}
return txn.Run(ctx, batch)
}
Loading

0 comments on commit f68a5a7

Please sign in to comment.