Skip to content

Commit

Permalink
sql: public schema long running migration
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
RichardJCai committed Dec 2, 2021
1 parent 2559c4b commit 90ac8dc
Show file tree
Hide file tree
Showing 4 changed files with 394 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pkg/migration/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ var migrations = []migration.Migration{
NoPrecondition,
alterSystemStmtDiagReqs,
),
migration.NewTenantMigration(
"update synthetic public schemas to be backed by a descriptor",
toCV(clusterversion.PublicSchemasWithDescriptors),
NoPrecondition,
publicSchemaMigration,
),
}

func init() {
Expand Down
194 changes: 194 additions & 0 deletions pkg/migration/migrations/public_schema_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)

func publicSchemaMigration(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job,
) error {
query := `
SELECT ns_db.id
FROM system.namespace AS ns_db
INNER JOIN system.namespace
AS ns_sc ON (
ns_db.id
= ns_sc."parentID"
)
WHERE ns_db.id != 1
AND ns_db."parentSchemaID" = 0
AND ns_db."parentID" = 0
AND ns_sc."parentSchemaID" = 0
AND ns_sc.name = 'public'
AND ns_sc.id = 29
ORDER BY ns_db.id ASC;
`
rows, err := d.InternalExecutor.QueryIterator(
ctx, "get_databases_with_synthetic_public_schemas", nil /* txn */, query,
)
if err != nil {
return err
}
var databaseIDs []descpb.ID
for ok, err := rows.Next(ctx); ok; ok, err = rows.Next(ctx) {
if err != nil {
return err
}
parentID := descpb.ID(tree.MustBeDInt(rows.Cur()[0]))
databaseIDs = append(databaseIDs, parentID)
}

for _, dbID := range databaseIDs {
fmt.Println("dbID:", dbID)
if err := createPublicSchemaForDatabase(ctx, dbID, d); err != nil {
return err
}
}

return nil
}

func createPublicSchemaForDatabase(
ctx context.Context, dbID descpb.ID, d migration.TenantDeps,
) error {
return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
found, desc, err := descriptors.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{})
if err != nil {
return err
}
if !found {
return errors.Newf("expected to find database with id %d", dbID)
}
if desc.HasPublicSchemaWithDescriptor() {
// If the database already has a descriptor backed public schema,
// there is no work to be done.
return nil
}
dbDescBuilder := dbdesc.NewBuilder(desc.DatabaseDesc())
dbDesc := dbDescBuilder.BuildExistingMutable()

b := txn.NewBatch()

publicSchemaDesc, _, err := sql.CreateSchemaDescriptorWithPrivileges(
ctx, d.DB, d.Codec, desc, tree.PublicSchema, security.AdminRoleName(), security.AdminRoleName(), true, /* allocateID */
)
publicSchemaID := publicSchemaDesc.GetID()
newKey := catalogkeys.MakeSchemaNameKey(d.Codec, dbID, publicSchemaDesc.GetName())
oldKey := catalogkeys.EncodeNameKey(d.Codec, catalogkeys.NewNameKeyComponents(dbID, keys.RootNamespaceID, tree.PublicSchema))
// Remove namespace entry for old public schema.
b.Del(oldKey)
b.CPut(newKey, publicSchemaID, nil)
if err := catalogkv.WriteNewDescToBatch(
ctx,
false,
d.Settings,
b,
d.Codec,
publicSchemaID,
publicSchemaDesc,
); err != nil {
return err
}

if dbDesc.DescriptorProto().GetDatabase().Schemas == nil {
dbDesc.DescriptorProto().GetDatabase().Schemas = map[string]descpb.DatabaseDescriptor_SchemaInfo{
tree.PublicSchema: {
ID: publicSchemaID,
},
}
} else {
dbDesc.DescriptorProto().GetDatabase().Schemas[tree.PublicSchema] = descpb.DatabaseDescriptor_SchemaInfo{
ID: publicSchemaID,
}
}
if err := descriptors.WriteDescToBatch(ctx, false, dbDesc, b); err != nil {
return err
}
allDescriptors, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
if err := migrateObjectsInDatabase(ctx, dbID, d, b, publicSchemaID, descriptors, allDescriptors); err != nil {
return err
}

return txn.Run(ctx, b)
})
}

func migrateObjectsInDatabase(
ctx context.Context,
dbID descpb.ID,
d migration.TenantDeps,
batch *kv.Batch,
newPublicSchemaID descpb.ID,
descriptors *descs.Collection,
allDescriptors []catalog.Descriptor,
) error {
var modifiedDescs []catalog.MutableDescriptor
for _, desc := range allDescriptors {
b := desc.NewBuilder()
// Only update descriptors in the parent db.
if desc.Dropped() || desc.GetParentID() != dbID {
continue
}
updateDesc := func(mut catalog.MutableDescriptor, newPublicSchemaID descpb.ID) {
oldKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), mut.GetParentSchemaID(), mut.GetName())
batch.Del(oldKey)
newKey := catalogkeys.MakeObjectNameKey(d.Codec, mut.GetParentID(), newPublicSchemaID, mut.GetName())
batch.Put(newKey, mut.GetID())
modifiedDescs = append(modifiedDescs, mut)
}
switch mut := b.BuildExistingMutable().(type) {
case *dbdesc.Mutable, *schemadesc.Mutable:
// Ignore database and schema descriptors.
case *tabledesc.Mutable:
updateDesc(mut, newPublicSchemaID)
mut.UnexposedParentSchemaID = newPublicSchemaID
case *typedesc.Mutable:
updateDesc(mut, newPublicSchemaID)
mut.ParentSchemaID = newPublicSchemaID
}
}
for _, modified := range modifiedDescs {
err := descriptors.WriteDescToBatch(
ctx, false, modified, batch,
)
if err != nil {
return err
}
}
return nil
}
159 changes: 159 additions & 0 deletions pkg/migration/migrations/public_schema_migration_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestPublicSchemaMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors - 1),
},
},
},
})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
defer db.Close()

// The backup has two databases "defaultdb" and "postgres" with synthetic
// public schemas. The migration should update the public schemas of
// defaultdb and postgres to be backed by a descriptor and have an id
// of not 29. Furthermore all objects in the public schema, in this case,
// table "t" and type "typ" should be placed in the new descriptor backed
// public schema of defaultdb.
/*
51 0 public 29
50 0 public 29
0 0 defaultdb 50
0 0 postgres 51
50 29 t 52
50 29 typ 53
50 29 _typ 54
*/
db.Exec(`RESTORE FROM '2021/05/21-020411.00' IN
'gs://cockroach-fixtures/tpcc-incrementals?AUTH=implicit'
AS OF SYSTEM TIME '2021-05-21 14:40:22'`)
_, err := db.Exec(`CREATE TABLE defaultdb.public.t(x INT)`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO defaultdb.public.t VALUES (1), (2), (3)`)
require.NoError(t, err)
_, err = db.Exec(`CREATE TYPE defaultdb.public.typ AS ENUM()`)
require.NoError(t, err)
// Ensure the migration works if we have UDS in the database.
_, err = db.Exec(`CREATE SCHEMA defaultdb.s`)
require.NoError(t, err)
_, err = db.Exec(`CREATE TABLE defaultdb.s.t(x INT)`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO defaultdb.s.t VALUES (1), (2), (3)`)
require.NoError(t, err)

_, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`,
clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors).String())
require.NoError(t, err)

// Verify that defaultdb and postgres have public schemas with IDs that
// are not 29.
row := db.QueryRow(`SELECT id FROM system.namespace WHERE name='public' AND "parentID"=50`)
require.NotNil(t, row)
var defaultDBPublicSchemaID int
row.Scan(&defaultDBPublicSchemaID)
require.NotEqual(t, defaultDBPublicSchemaID, keys.PublicSchemaID)

row = db.QueryRow(`SELECT id FROM system.namespace WHERE name='public' AND "parentID"=51`)
require.NotNil(t, row)
var postgresPublicSchemaID int
row.Scan(&postgresPublicSchemaID)
require.NotEqual(t, postgresPublicSchemaID, keys.PublicSchemaID)

// Verify that table "t" and type "typ" and "_typ" are have parent schema id
// defaultDBPublicSchemaID.
var tParentSchemaID, typParentSchemaID, typArrParentSchemaID int
row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='t' AND "parentID"=50`)
row.Scan(&tParentSchemaID)
require.Equal(t, tParentSchemaID, defaultDBPublicSchemaID)

row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='typ' AND "parentID"=50`)
row.Scan(&typParentSchemaID)
require.Equal(t, typParentSchemaID, defaultDBPublicSchemaID)

row = db.QueryRow(`SELECT "parentSchemaID" FROM system.namespace WHERE name='_typ' AND "parentID"=50`)
row.Scan(&typArrParentSchemaID)
require.Equal(t, typArrParentSchemaID, defaultDBPublicSchemaID)

_, err = db.Exec(`INSERT INTO t VALUES (4)`)
require.NoError(t, err)

rows, err := db.Query(`SELECT * FROM defaultdb.t ORDER BY x`)
require.NoError(t, err)
defer rows.Close()
if err != nil {
t.Fatal(err)
}

// Verify that we can query table t.
var x int
for i := 1; i < 5; i++ {
rows.Next()
require.NoError(t, err)
err = rows.Scan(&x)
require.NoError(t, err)
require.Equal(t, x, i)
}

// Verify that we can use type "typ".
_, err = db.Exec(`CREATE TABLE t2(x typ)`)
require.NoError(t, err)

// Verify that we can use the typ / enum.
_, err = db.Exec(`ALTER TYPE typ ADD VALUE 'hello'`)
require.NoError(t, err)

_, err = db.Exec(`INSERT INTO t2 VALUES ('hello')`)
require.NoError(t, err)

row = db.QueryRow(`SELECT * FROM t2`)
require.NotNil(t, row)

var helloStr string
row.Scan(&helloStr)

require.Equal(t, "hello", helloStr)

rows, err = db.Query(`SELECT * FROM defaultdb.s.t ORDER BY x`)
require.NoError(t, err)

// Verify that we can query table defaultdb.s.t (table in a UDS).
for i := 1; i < 4; i++ {
rows.Next()
require.NoError(t, err)
err = rows.Scan(&x)
require.NoError(t, err)
require.Equal(t, x, i)
}
}
Loading

0 comments on commit 90ac8dc

Please sign in to comment.