diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index a1294e6c303a..e9911386e6a4 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "ensure_no_draining_names.go", "insert_missing_public_schema_namespace_entry.go", "migrations.go", + "public_schema_migration.go", "schema_changes.go", "seed_tenant_span_configs.go", ], @@ -23,11 +24,17 @@ go_library( "//pkg/migration", "//pkg/roachpb:with-mocks", "//pkg/security", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", + "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/dbdesc", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/schemadesc", "//pkg/sql/catalog/systemschema", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/catalog/typedesc", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/util/log", @@ -49,6 +56,7 @@ go_test( "ensure_no_draining_names_external_test.go", "helpers_test.go", "main_test.go", + "public_schema_migration_external_test.go", ], data = glob(["testdata/**"]), embed = [":migrations"], @@ -58,9 +66,11 @@ go_test( "//pkg/jobs", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkv", @@ -73,6 +83,7 @@ go_test( "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index 1b784c7a9810..a2998f5402d4 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -59,7 +59,7 @@ var migrations = []migration.Migration{ alterSystemStmtDiagReqs, ), migration.NewTenantMigration( - "seed system.span_configurations with configs for existing for existing tenants", + "seed system.span_configurations with configs for existing tenants", toCV(clusterversion.SeedTenantSpanConfigs), NoPrecondition, seedTenantSpanConfigsMigration, @@ -75,6 +75,11 @@ var migrations = []migration.Migration{ NoPrecondition, alterTableProtectedTimestampRecords, ), + migration.NewTenantMigration("update synthetic public schemas to be backed by a descriptor", + toCV(clusterversion.PublicSchemasWithDescriptors), + NoPrecondition, + publicSchemaMigration, + ), } func init() { diff --git a/pkg/migration/migrations/public_schema_migration.go b/pkg/migration/migrations/public_schema_migration.go new file mode 100644 index 000000000000..4986005cd40f --- /dev/null +++ b/pkg/migration/migrations/public_schema_migration.go @@ -0,0 +1,225 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +) + +func publicSchemaMigration( + ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job, +) error { + query := ` + SELECT ns_db.id + FROM system.namespace AS ns_db + INNER JOIN system.namespace + AS ns_sc ON ( + ns_db.id + = ns_sc."parentID" + ) + WHERE ns_db.id != 1 + AND ns_db."parentSchemaID" = 0 + AND ns_db."parentID" = 0 + AND ns_sc."parentSchemaID" = 0 + AND ns_sc.name = 'public' + AND ns_sc.id = 29 +ORDER BY ns_db.id ASC; +` + rows, err := d.InternalExecutor.QueryIterator( + ctx, "get_databases_with_synthetic_public_schemas", nil /* txn */, query, + ) + if err != nil { + return err + } + var databaseIDs []descpb.ID + for ok, err := rows.Next(ctx); ok; ok, err = rows.Next(ctx) { + if err != nil { + return err + } + parentID := descpb.ID(tree.MustBeDInt(rows.Cur()[0])) + databaseIDs = append(databaseIDs, parentID) + } + + for _, dbID := range databaseIDs { + if err := createPublicSchemaForDatabase(ctx, dbID, d); err != nil { + return err + } + } + + return nil +} + +func createPublicSchemaForDatabase( + ctx context.Context, dbID descpb.ID, d migration.TenantDeps, +) error { + return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + return createPublicSchemaDescriptor(ctx, txn, descriptors, dbID, d) + }) +} + +func createPublicSchemaDescriptor( + ctx context.Context, + txn *kv.Txn, + descriptors *descs.Collection, + dbID descpb.ID, + d migration.TenantDeps, +) error { + _, desc, err := descriptors.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{Required: true}) + if err != nil { + return err + } + if desc.HasPublicSchemaWithDescriptor() { + // If the database already has a descriptor backed public schema, + // there is no work to be done. + return nil + } + dbDescBuilder := dbdesc.NewBuilder(desc.DatabaseDesc()) + dbDesc := dbDescBuilder.BuildExistingMutableDatabase() + + b := txn.NewBatch() + + publicSchemaDesc, _, err := sql.CreateSchemaDescriptorWithPrivileges( + ctx, d.DB, d.Codec, desc, tree.PublicSchema, security.AdminRoleName(), security.AdminRoleName(), true, /* allocateID */ + ) + if err != nil { + return err + } + publicSchemaID := publicSchemaDesc.GetID() + newKey := catalogkeys.MakeSchemaNameKey(d.Codec, dbID, publicSchemaDesc.GetName()) + oldKey := catalogkeys.EncodeNameKey(d.Codec, catalogkeys.NewNameKeyComponents(dbID, keys.RootNamespaceID, tree.PublicSchema)) + // Remove namespace entry for old public schema. + b.Del(oldKey) + b.CPut(newKey, publicSchemaID, nil) + if err := catalogkv.WriteNewDescToBatch( + ctx, + false, + d.Settings, + b, + d.Codec, + publicSchemaID, + publicSchemaDesc, + ); err != nil { + return err + } + + if dbDesc.Schemas == nil { + dbDesc.Schemas = map[string]descpb.DatabaseDescriptor_SchemaInfo{ + tree.PublicSchema: { + ID: publicSchemaID, + }, + } + } else { + dbDesc.Schemas[tree.PublicSchema] = descpb.DatabaseDescriptor_SchemaInfo{ + ID: publicSchemaID, + } + } + if err := descriptors.WriteDescToBatch(ctx, false, dbDesc, b); err != nil { + return err + } + allDescriptors, err := descriptors.GetAllDescriptors(ctx, txn) + if err != nil { + return err + } + if err := migrateObjectsInDatabase(ctx, dbID, d, txn, publicSchemaID, descriptors, allDescriptors); err != nil { + return err + } + + return txn.Run(ctx, b) +} + +func migrateObjectsInDatabase( + ctx context.Context, + dbID descpb.ID, + d migration.TenantDeps, + txn *kv.Txn, + newPublicSchemaID descpb.ID, + descriptors *descs.Collection, + allDescriptors []catalog.Descriptor, +) error { + const minBatchSizeInBytes = 1 << 20 /* 512 KiB batch size */ + currSize := 0 + var modifiedDescs []catalog.MutableDescriptor + batch := txn.NewBatch() + for _, desc := range allDescriptors { + // Only update descriptors in the parent db and public schema. + if desc.Dropped() || desc.GetParentID() != dbID || + (desc.GetParentSchemaID() != keys.PublicSchemaID && desc.GetParentSchemaID() != descpb.InvalidID) { + continue + } + b := desc.NewBuilder() + updateDesc := func(mut catalog.MutableDescriptor, newPublicSchemaID descpb.ID) { + oldKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), mut.GetParentSchemaID(), mut.GetName()) + batch.Del(oldKey) + newKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), newPublicSchemaID, mut.GetName()) + batch.Put(newKey, mut.GetID()) + modifiedDescs = append(modifiedDescs, mut) + } + switch mut := b.BuildExistingMutable().(type) { + case *dbdesc.Mutable, *schemadesc.Mutable: + // Ignore database and schema descriptors. + case *tabledesc.Mutable: + updateDesc(mut, newPublicSchemaID) + mut.UnexposedParentSchemaID = newPublicSchemaID + currSize += mut.Size() + case *typedesc.Mutable: + updateDesc(mut, newPublicSchemaID) + mut.ParentSchemaID = newPublicSchemaID + currSize += mut.Size() + } + + // Once we reach the minimum batch size, write the batch and create a new + // one. + if currSize >= minBatchSizeInBytes { + for _, modified := range modifiedDescs { + err := descriptors.WriteDescToBatch( + ctx, false, modified, batch, + ) + if err != nil { + return err + } + } + if err := txn.Run(ctx, batch); err != nil { + return err + } + currSize = 0 + batch = txn.NewBatch() + modifiedDescs = make([]catalog.MutableDescriptor, 0) + } + } + for _, modified := range modifiedDescs { + err := descriptors.WriteDescToBatch( + ctx, false, modified, batch, + ) + if err != nil { + return err + } + } + return txn.Run(ctx, batch) +} diff --git a/pkg/migration/migrations/public_schema_migration_external_test.go b/pkg/migration/migrations/public_schema_migration_external_test.go new file mode 100644 index 000000000000..7a1eac0b03f7 --- /dev/null +++ b/pkg/migration/migrations/public_schema_migration_external_test.go @@ -0,0 +1,214 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func publicSchemaMigrationTest(t *testing.T, ctx context.Context, numTables int) { + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors-1), + false, + ) + // 2048 KiB batch size - 4x the public schema migration's minBatchSizeInBytes. + const maxCommandSize = 1 << 22 + kvserver.MaxCommandSize.Override(ctx, &settings.SV, maxCommandSize) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors - 1), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + defer db.Close() + + // We bootstrap the cluster on the older version where databases are + // created without public schemas. The namespace before upgrading looks like: + /* + 51 0 public 29 + 50 0 public 29 + 0 0 defaultdb 50 + 0 0 postgres 51 + 50 29 t 52 + 50 29 typ 53 + 50 29 _typ 54 + */ + _, err := db.Exec(`CREATE TABLE defaultdb.public.t(x INT)`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO defaultdb.public.t VALUES (1), (2), (3)`) + require.NoError(t, err) + _, err = db.Exec(`CREATE TYPE defaultdb.public.typ AS ENUM()`) + require.NoError(t, err) + // Ensure the migration works if we have UDS in the database. + _, err = db.Exec(`CREATE SCHEMA defaultdb.s`) + require.NoError(t, err) + _, err = db.Exec(`CREATE TABLE defaultdb.s.table_in_uds(x INT)`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO defaultdb.s.table_in_uds VALUES (1), (2), (3)`) + require.NoError(t, err) + + // Create large descriptors to ensure we're batching descriptors. + // The name of the table is approx 1000 bytes. + // Thus, we create approximately 5000 KiB of descriptors in this database. + // This is also larger than the 2048 KiB max command size we set. + // The batch size in the migration is 512 KiB so this ensures we have at + // least two batches. + for i := 0; i < numTables; i++ { + _, err = db.Exec(fmt.Sprintf(`CREATE TABLE defaultdb.t%s%d()`, strings.Repeat("x", 10000), i)) + require.NoError(t, err) + } + + _, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors).String()) + require.NoError(t, err) + + // Verify that defaultdb and postgres have public schemas with IDs that + // are not 29. + row := db.QueryRow(`SELECT id FROM system.namespace WHERE name='public' AND "parentID"=50`) + require.NotNil(t, row) + var defaultDBPublicSchemaID int + err = row.Scan(&defaultDBPublicSchemaID) + require.NoError(t, err) + + require.NotEqual(t, defaultDBPublicSchemaID, keys.PublicSchemaID) + + row = db.QueryRow(`SELECT id FROM system.namespace WHERE name='public' AND "parentID"=51`) + require.NotNil(t, row) + var postgresPublicSchemaID int + err = row.Scan(&postgresPublicSchemaID) + require.NoError(t, err) + + require.NotEqual(t, postgresPublicSchemaID, keys.PublicSchemaID) + + // Verify that table "t" and type "typ" and "_typ" are have parent schema id + // defaultDBPublicSchemaID. + var tParentSchemaID, typParentSchemaID, typArrParentSchemaID int + row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='t' AND "parentID"=50`) + err = row.Scan(&tParentSchemaID) + require.NoError(t, err) + + require.Equal(t, tParentSchemaID, defaultDBPublicSchemaID) + + row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='typ' AND "parentID"=50`) + err = row.Scan(&typParentSchemaID) + require.NoError(t, err) + + require.Equal(t, typParentSchemaID, defaultDBPublicSchemaID) + + row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='_typ' AND "parentID"=50`) + err = row.Scan(&typArrParentSchemaID) + require.NoError(t, err) + + require.Equal(t, typArrParentSchemaID, defaultDBPublicSchemaID) + + _, err = db.Exec(`INSERT INTO t VALUES (4)`) + require.NoError(t, err) + + rows, err := db.Query(`SELECT * FROM defaultdb.t ORDER BY x`) + require.NoError(t, err) + defer rows.Close() + if err != nil { + t.Fatal(err) + } + + // Verify that we can query table t. + var x int + for i := 1; i < 5; i++ { + rows.Next() + require.NoError(t, err) + err = rows.Scan(&x) + require.NoError(t, err) + require.Equal(t, x, i) + } + + // Verify that we can use type "typ". + _, err = db.Exec(`CREATE TABLE t2(x typ)`) + require.NoError(t, err) + + // Verify that we can use the typ / enum. + _, err = db.Exec(`ALTER TYPE typ ADD VALUE 'hello'`) + require.NoError(t, err) + + _, err = db.Exec(`INSERT INTO t2 VALUES ('hello')`) + require.NoError(t, err) + + row = db.QueryRow(`SELECT * FROM t2`) + require.NotNil(t, row) + + var helloStr string + err = row.Scan(&helloStr) + require.NoError(t, err) + + require.Equal(t, "hello", helloStr) + + rows, err = db.Query(`SELECT * FROM defaultdb.s.table_in_uds ORDER BY x`) + require.NoError(t, err) + + // Verify that we can query table defaultdb.s.table_in_uds (table in a UDS). + for i := 1; i < 4; i++ { + rows.Next() + require.NoError(t, err) + err = rows.Scan(&x) + require.NoError(t, err) + require.Equal(t, x, i) + } + + // Verify that the tables with large descriptor sizes have parentSchemaIDs + // that are not 29. + const oldPublicSchemaID = 29 + var parentSchemaID int + for i := 0; i < numTables; i++ { + row = db.QueryRow(fmt.Sprintf(`SELECT "parentSchemaID" FROM system.namespace WHERE name = 't%s%d'`, strings.Repeat("x", 10000), i)) + err = row.Scan(&parentSchemaID) + require.NoError(t, err) + require.NotEqual(t, parentSchemaID, descpb.InvalidID) + require.NotEqual(t, oldPublicSchemaID, parentSchemaID) + } +} + +func TestPublicSchemaMigration500Tables(t *testing.T) { + skip.UnderRace(t, "takes >1min under race") + defer leaktest.AfterTest(t)() + ctx := context.Background() + + publicSchemaMigrationTest(t, ctx, 500) +} + +func TestPublicSchemaMigration10Tables(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + publicSchemaMigrationTest(t, ctx, 10) +} diff --git a/pkg/sql/create_schema.go b/pkg/sql/create_schema.go index c268438913eb..9c0b595213d3 100644 --- a/pkg/sql/create_schema.go +++ b/pkg/sql/create_schema.go @@ -102,10 +102,43 @@ func CreateUserDefinedSchemaDescriptor( return nil, nil, err } + owner := user + if !n.AuthRole.Undefined() { + exists, err := RoleExists(ctx, execCfg, txn, authRole) + if err != nil { + return nil, nil, err + } + if !exists { + return nil, nil, pgerror.Newf(pgcode.UndefinedObject, "role/user %q does not exist", + n.AuthRole) + } + owner = authRole + } + + desc, privs, err := CreateSchemaDescriptorWithPrivileges(ctx, execCfg.DB, execCfg.Codec, db, schemaName, user, owner, allocateID) + if err != nil { + return nil, nil, err + } + + return desc, privs, nil +} + +// CreateSchemaDescriptorWithPrivileges creates a new schema descriptor with +// the provided name and privileges. +func CreateSchemaDescriptorWithPrivileges( + ctx context.Context, + kvDB *kv.DB, + codec keys.SQLCodec, + db catalog.DatabaseDescriptor, + schemaName string, + user, owner security.SQLUsername, + allocateID bool, +) (*schemadesc.Mutable, *descpb.PrivilegeDescriptor, error) { // Create the ID. var id descpb.ID + var err error if allocateID { - id, err = catalogkv.GenerateUniqueDescID(ctx, execCfg.DB, execCfg.Codec) + id, err = catalogkv.GenerateUniqueDescID(ctx, kvDB, codec) if err != nil { return nil, nil, err } @@ -120,19 +153,7 @@ func CreateUserDefinedSchemaDescriptor( db.GetPrivileges(), ) - if !n.AuthRole.Undefined() { - exists, err := RoleExists(ctx, execCfg, txn, authRole) - if err != nil { - return nil, nil, err - } - if !exists { - return nil, nil, pgerror.Newf(pgcode.UndefinedObject, "role/user %q does not exist", - n.AuthRole) - } - privs.SetOwner(authRole) - } else { - privs.SetOwner(user) - } + privs.SetOwner(owner) // Create the SchemaDescriptor. desc := schemadesc.NewBuilder(&descpb.SchemaDescriptor{