Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
80132: sql: always return "public" for ResolveSchemaNameByID with ID 29  r=ajwerner a=RichardJCai

This allows us to resolve the synthetic public schema in the case
where the changefeed is created with a cursor timestamp that is
prior to the public schema migration.

Release note (bug fix): In 22.1 beta, if the changefeed is created
with a cursor time stamp prior to when the public schema migration
happened, it would fail to resolve the public schema.
This change allows us to resolve the schema.

Co-authored-by: richardjcai <[email protected]>
  • Loading branch information
craig[bot] and RichardJCai committed Apr 19, 2022
2 parents 176f6fe + 13487e1 commit 1fdbb16
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ go_test(
"main_test.go",
"name_test.go",
"nemeses_test.go",
"public_schema_migration_changefeed_external_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
"sink_cloudstorage_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func getQualifiedTableNameObj(
return tree.TableName{}, err
}
schemaID := desc.GetParentSchemaID()
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, dbDesc, schemaID, execCfg.Settings.Version)
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, dbDesc, schemaID)
if err != nil {
return tree.TableName{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"
"net/url"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestPublicSchemaMigrationWithCreateChangefeed(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors - 1),
},
}

args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: knobs,
},
}

tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
defer db.Close()
s := tc.Server(0)

sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()
f := makeTableFeedFactory(s, db, sink)

tdb := sqlutils.MakeSQLRunner(db)
tdb.Exec(t, `CREATE TABLE defaultdb.public.t();`)

// Save timestamp pre public schema migration to use for cursor time.
var tsLogical string
tdb.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsLogical)

// Kick off public schema migration.
{
_, err := tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`,
clusterversion.ByKey(clusterversion.PublicSchemasWithDescriptors).String())
require.NoError(t, err)
}

tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)

// Test passes if we can create and close the feed.
out := feed(t, f, `CREATE CHANGEFEED FOR defaultdb.public.t WITH cursor=$1`, tsLogical)
defer closeFeed(t, out)
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,16 @@ SELECT "parentSchemaID" FROM system.namespace WHERE name = $1
}
}

func TestPublicSchemaMigration500Tables(t *testing.T) {
skip.WithIssue(t, 78947)
func TestPublicSchemaMigration250Tables(t *testing.T) {
skip.UnderRace(t, "takes >1min under race")
skip.UnderStress(t, "takes >1min under stress")
defer leaktest.AfterTest(t)()
ctx := context.Background()

publicSchemaMigrationTest(t, ctx, 500)
publicSchemaMigrationTest(t, ctx, 250)
}

func TestPublicSchemaMigration10Tables(t *testing.T) {
skip.WithIssue(t, 78947)
defer leaktest.AfterTest(t)()
ctx := context.Background()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/catalog/catconstants/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/settings",
],
Expand Down
15 changes: 3 additions & 12 deletions pkg/sql/catalog/catconstants/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@

package catconstants

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
)
import "github.com/cockroachdb/cockroach/pkg/keys"

// StaticSchemaIDMapVirtualPublicSchema is a map of statically known schema IDs
// on versions prior to PublicSchemasWithDescriptors.
Expand All @@ -38,12 +33,8 @@ var StaticSchemaIDMap = map[uint32]string{

// GetStaticSchemaIDMap returns a map of schema ids to schema names for the
// static schemas.
func GetStaticSchemaIDMap(ctx context.Context, version clusterversion.Handle) map[uint32]string {
if !version.IsActive(ctx, clusterversion.PublicSchemasWithDescriptors) {
return StaticSchemaIDMapVirtualPublicSchema
}

return StaticSchemaIDMap
func GetStaticSchemaIDMap() map[uint32]string {
return StaticSchemaIDMapVirtualPublicSchema
}

// PgCatalogName is the name of the pg_catalog system schema.
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/catalog/resolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/sql/catalog",
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/catalog/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package resolver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -302,10 +301,9 @@ func ResolveSchemaNameByID(
codec keys.SQLCodec,
db catalog.DatabaseDescriptor,
schemaID descpb.ID,
version clusterversion.Handle,
) (string, error) {
// Fast-path for public schema and virtual schemas, to avoid hot lookups.
staticSchemaMap := catconstants.GetStaticSchemaIDMap(ctx, version)
staticSchemaMap := catconstants.GetStaticSchemaIDMap()
if schemaName, ok := staticSchemaMap[uint32(schemaID)]; ok {
return schemaName, nil
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/temporary_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ func cleanupSchemaObjects(
codec,
db,
dTableDesc.GetParentSchemaID(),
settings.Version,
)
if err != nil {
return err
Expand Down

0 comments on commit 1fdbb16

Please sign in to comment.