diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 373a844fc4d2..6dd6ead23559 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -393,6 +393,7 @@ ALL_TESTS = [ "//pkg/sql/catalog/nstree:nstree_test", "//pkg/sql/catalog/randgen:randgen_test", "//pkg/sql/catalog/redact:redact_test", + "//pkg/sql/catalog/replication:replication_test", "//pkg/sql/catalog/resolver:resolver_test", "//pkg/sql/catalog/schemadesc:schemadesc_test", "//pkg/sql/catalog/schemaexpr:schemaexpr_test", @@ -1814,6 +1815,8 @@ GO_TARGETS = [ "//pkg/sql/catalog/randgen:randgen_test", "//pkg/sql/catalog/redact:redact", "//pkg/sql/catalog/redact:redact_test", + "//pkg/sql/catalog/replication:replication", + "//pkg/sql/catalog/replication:replication_test", "//pkg/sql/catalog/resolver:resolver", "//pkg/sql/catalog/resolver:resolver_test", "//pkg/sql/catalog/rewrite:rewrite", diff --git a/pkg/sql/catalog/replication/BUILD.bazel b/pkg/sql/catalog/replication/BUILD.bazel new file mode 100644 index 000000000000..f6880de9abc2 --- /dev/null +++ b/pkg/sql/catalog/replication/BUILD.bazel @@ -0,0 +1,44 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "replication", + srcs = ["reader_catalog.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/dbdesc", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/funcdesc", + "//pkg/sql/catalog/nstree", + "//pkg/sql/catalog/schemadesc", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/catalog/typedesc", + "//pkg/util/hlc", + "//pkg/util/protoutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "replication_test", + srcs = ["reader_catalog_test.go"], + deps = [ + ":replication", + "//pkg/base", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/catalog/replication/reader_catalog.go b/pkg/sql/catalog/replication/reader_catalog.go new file mode 100644 index 000000000000..12ac579fd772 --- /dev/null +++ b/pkg/sql/catalog/replication/reader_catalog.go @@ -0,0 +1,270 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package replication + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// SetupOrAdvanceStandbyReaderCatalog sets up a reader catalog that reads from +// that tenant as of the passed timestamp -- potentially replacing the current +// reader catalog if needed -- or advances the current reader catalog to read as +// of that timestamp if that is possible to do in place by updating the ts and +// bumping the descriptor versions. In addition to mirroring all user-created +// catalog entries, this also updates the system.users, roles and role options, +// and database role settings tables in this cluster to read from those tables in +// the other cluster (and fails if they have incompatible indexes). Changes to +// the catalog are atomic. +func SetupOrAdvanceStandbyReaderCatalog( + ctx context.Context, + fromID roachpb.TenantID, + asOf hlc.Timestamp, + descsCol descs.DB, + st *cluster.Settings, +) error { + extracted, err := getCatalogForTenantAsOf(ctx, st, descsCol.KV(), fromID, asOf) + if err != nil { + return err + } + return descsCol.DescsTxn( + ctx, func(ctx context.Context, txn descs.Txn) error { + // Descriptors that no longer exist + descriptorsUpdated := catalog.DescriptorIDSet{} + namespaceUpdated := catalog.DescriptorIDSet{} + allExistingDescs, err := txn.Descriptors().GetAll(ctx, txn.KV()) + if err != nil { + return err + } + // Resolve any existing descriptors within the tenant, which + // will be use to compute old values for writing. + b := txn.KV().NewBatch() + if err := extracted.ForEachDescriptor(func(desc catalog.Descriptor) error { + if !shouldSetupForReader(desc.GetID(), desc.GetParentID()) { + return nil + } + // Track this descriptor was updated. + descriptorsUpdated.Add(desc.GetID()) + // If there is an existing descriptor with the same ID, we should + // determine the old bytes in storage for the upsert. + var existingRawBytes []byte + existingDesc, err := txn.Descriptors().MutableByID(txn.KV()).Desc(ctx, desc.GetID()) + if err == nil { + existingRawBytes = existingDesc.GetRawBytesInStorage() + } else if errors.Is(err, catalog.ErrDescriptorNotFound) { + err = nil + } else { + return err + } + // Existing descriptor should never be a system descriptor. + if existingDesc != nil && + existingDesc.GetParentID() == keys.SystemDatabaseID && + desc.GetParentID() != keys.SystemDatabaseID { + return errors.AssertionFailedf("system database from replicated tenant " + + "and reader have a system table that collide, ensure both are created on the " + + "same version of CRDB.") + } + var mut catalog.MutableDescriptor + switch t := desc.DescriptorProto().GetUnion().(type) { + case *descpb.Descriptor_Table: + t.Table.Version = 1 + var mutBuilder tabledesc.TableDescriptorBuilder + var mutTbl *tabledesc.Mutable + if existingRawBytes != nil { + t.Table.Version = existingDesc.GetVersion() + mutBuilder = existingDesc.NewBuilder().(tabledesc.TableDescriptorBuilder) + newDescBytes, err := protoutil.Marshal(t.Table) + if err != nil { + return err + } + if err := protoutil.Unmarshal(newDescBytes, mutBuilder.BuildExistingMutableTable().TableDesc()); err != nil { + return err + } + mutTbl = mutBuilder.BuildExistingMutableTable() + } else { + mutBuilder = tabledesc.NewBuilder(t.Table) + mutTbl = mutBuilder.BuildCreatedMutableTable() + } + mut = mutTbl + // Convert any physical tables into external row tables. + // Note: Materialized views will be converted, but their + // view definition will be wiped. + if mutTbl.IsPhysicalTable() { + mutTbl.ViewQuery = "" + mutTbl.SetExternalRowData(&descpb.ExternalRowData{TenantID: fromID, TableID: desc.GetID(), AsOf: asOf}) + } + case *descpb.Descriptor_Database: + t.Database.Version = 1 + var mutBuilder dbdesc.DatabaseDescriptorBuilder + if existingRawBytes != nil { + mutBuilder = existingDesc.NewBuilder().(dbdesc.DatabaseDescriptorBuilder) + newDescBytes, err := protoutil.Marshal(t.Database) + if err != nil { + return err + } + if err := protoutil.Unmarshal(newDescBytes, mutBuilder.BuildExistingMutableDatabase().DatabaseDesc()); err != nil { + return err + } + mut = mutBuilder.BuildExistingMutable() + } else { + mutBuilder = dbdesc.NewBuilder(t.Database) + mut = mutBuilder.BuildCreatedMutable() + } + case *descpb.Descriptor_Schema: + t.Schema.Version = 1 + var mutBuilder schemadesc.SchemaDescriptorBuilder + if existingRawBytes != nil { + mutBuilder = existingDesc.NewBuilder().(schemadesc.SchemaDescriptorBuilder) + newDescBytes, err := protoutil.Marshal(t.Schema) + if err != nil { + return err + } + if err := protoutil.Unmarshal(newDescBytes, mutBuilder.BuildExistingMutableSchema().SchemaDesc()); err != nil { + return err + } + mut = mutBuilder.BuildExistingMutable() + } else { + mutBuilder = schemadesc.NewBuilder(t.Schema) + mut = mutBuilder.BuildCreatedMutable() + } + case *descpb.Descriptor_Function: + t.Function.Version = 1 + var mutBuilder funcdesc.FunctionDescriptorBuilder + if existingRawBytes != nil { + mutBuilder = existingDesc.NewBuilder().(funcdesc.FunctionDescriptorBuilder) + newDescBytes, err := protoutil.Marshal(t.Function) + if err != nil { + return err + } + if err := protoutil.Unmarshal(newDescBytes, mutBuilder.BuildExistingMutableFunction().FuncDesc()); err != nil { + return err + } + mut = mutBuilder.BuildExistingMutable() + } else { + mutBuilder = funcdesc.NewBuilder(t.Function) + mut = mutBuilder.BuildCreatedMutable() + } + case *descpb.Descriptor_Type: + t.Type.Version = 1 + var mutBuilder typedesc.TypeDescriptorBuilder + if existingRawBytes != nil { + mutBuilder = existingDesc.NewBuilder().(typedesc.TypeDescriptorBuilder) + newDescBytes, err := protoutil.Marshal(t.Type) + if err != nil { + return err + } + if err := protoutil.Unmarshal(newDescBytes, mutBuilder.BuildExistingMutableType()); err != nil { + return err + } + mut = mutBuilder.BuildExistingMutable() + } else { + mutBuilder = typedesc.NewBuilder(t.Type) + mut = mutBuilder.BuildCreatedMutable() + } + } + return errors.Wrapf(txn.Descriptors().WriteDescToBatch(ctx, true, mut, b), + "unable to create replicated descriptor: %d %T", mut.GetID(), mut) + }); err != nil { + return err + } + if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error { + if !shouldSetupForReader(e.GetID(), e.GetParentID()) { + return nil + } + namespaceUpdated.Add(e.GetID()) + return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e) + }); err != nil { + return err + } + // Figure out which descriptors should be deleted. + if err := allExistingDescs.ForEachDescriptor(func(desc catalog.Descriptor) error { + // Skip descriptors that were updated above + if !shouldSetupForReader(desc.GetID(), desc.GetParentID()) || + descriptorsUpdated.Contains(desc.GetID()) { + return nil + } + // Delete the descriptor from the batch + return errors.Wrapf(txn.Descriptors().DeleteDescToBatch(ctx, true, desc.GetID(), b), + "deleting descriptor") + }); err != nil { + return err + } + // Figure out which namespaces should be deleted. + if err := allExistingDescs.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error { + // Skip descriptors that were updated above + if !shouldSetupForReader(e.GetID(), e.GetParentID()) || + descriptorsUpdated.Contains(e.GetID()) { + return nil + } + return errors.Wrapf(txn.Descriptors().DeleteNamespaceEntryToBatch(ctx, true, e, b), + "deleting namespace") + }); err != nil { + return err + } + return errors.Wrap(txn.KV().Run(ctx, b), "executing bach for updating catalog") + }) +} + +// shouldSetupForReader determines if a descriptor should be setup +// access via external row data. +func shouldSetupForReader(id descpb.ID, parentID descpb.ID) bool { + switch id { + case keys.UsersTableID, keys.RoleMembersTableID, keys.RoleOptionsTableID, + keys.DatabaseRoleSettingsTableID, keys.TableStatisticsTableID: + return true + default: + return parentID != keys.SystemDatabaseID && + id != keys.SystemDatabaseID + } +} + +// getCatalogForTenantAsOf reads the descriptors from a given tenant +// at the given timestamp. +func getCatalogForTenantAsOf( + ctx context.Context, + st *cluster.Settings, + db *kv.DB, + tenantID roachpb.TenantID, + asOf hlc.Timestamp, +) (all nstree.Catalog, _ error) { + cf := descs.NewBareBonesCollectionFactory(st, keys.MakeSQLCodec(tenantID)) + err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := txn.SetFixedTimestamp(ctx, asOf) + if err != nil { + return err + } + descsCol := cf.NewCollection(ctx) + defer descsCol.ReleaseAll(ctx) + all, err = descsCol.GetAllFromStorageUnvalidated(ctx, txn) + if err != nil { + return err + } + + return nil + }) + return all, err +} diff --git a/pkg/sql/catalog/replication/reader_catalog_test.go b/pkg/sql/catalog/replication/reader_catalog_test.go new file mode 100644 index 000000000000..76c1d4f9a2e9 --- /dev/null +++ b/pkg/sql/catalog/replication/reader_catalog_test.go @@ -0,0 +1,100 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package replication_test + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func TestReaderCatalog(t *testing.T) { + ctx := context.Background() + ts := serverutils.StartServerOnly(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + }) + defer ts.Stop(ctx) + srcTenant, _, err := ts.StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID(), + TenantName: "src", + }) + require.NoError(t, err) + destTenant, _, err := ts.StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID2(), + TenantName: "dest", + }) + require.NoError(t, err) + srcConn := srcTenant.SQLConn(t) + destConn := destTenant.SQLConn(t) + + ddlToExec := []string{ + "CREATE USER roacher WITH CREATEROLE;", + "GRANT ADMIN TO roacher;", + "ALTER USER roacher SET timezone='America/New_York';", + "CREATE DATABASE db1;", + "CREATE SCHEMA db1.sc1;", + "CREATE SEQUENCE sq1;", + "CREATE TYPE IF NOT EXISTS status AS ENUM ('open', 'closed', 'inactive');", + "CREATE TABLE t1(n int default nextval('sq1'), val status);", + "INSERT INTO t1(val) VALUES('open');", + "INSERT INTO t1(val) VALUES('closed');", + "INSERT INTO t1(val) VALUES('inactive');", + "CREATE VIEW v1 AS (SELECT n from t1);", + } + for _, ddl := range ddlToExec { + _, err = srcConn.Exec(ddl) + require.NoError(t, err) + } + + now := ts.Clock().Now() + idb := destTenant.InternalDB().(*sql.InternalDB) + require.NoError(t, replication.SetupOrAdvanceStandbyReaderCatalog(ctx, serverutils.TestTenantID(), now, idb, destTenant.ClusterSettings())) + + srcRunner := sqlutils.MakeSQLRunner(srcConn) + destRunner := sqlutils.MakeSQLRunner(destConn) + + compareConn := func(query string) { + srcRes := srcRunner.QueryStr(t, fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", query, now.AsOfSystemTime())) + destRes := destRunner.QueryStr(t, query) + require.Equal(t, srcRes, destRes) + } + + // Validate tables and views match in the catalog reader + compareConn("SELECT * FROM t1 ORDER BY n") + compareConn("SELECT * FROM v1 ORDER BY 1") + + // Validate that systme tables are synced + compareConn("SELECT * FROM system.users") + compareConn("SELECT * FROM system.table_statistics") + compareConn("SELECT * FROM system.role_options") + compareConn("SELECT * FROM system.database_role_settings") + +} +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 68ae10af7702..fe16e2046912 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -2331,6 +2331,13 @@ func (desc *Mutable) SetOffline(reason string) { desc.OfflineReason = reason } +func (desc *Mutable) SetExternalRowData(ext *descpb.ExternalRowData) { + // Do not set materialized views for sequences, they will + // handle this on their own. + desc.IsMaterializedView = !desc.IsSequence() + desc.External = ext +} + // IsLocalityRegionalByRow implements the TableDescriptor interface. func (desc *wrapper) IsLocalityRegionalByRow() bool { return desc.LocalityConfig.GetRegionalByRow() != nil