From 867c739d5d5a0a3ea3003549ed6e6d6444eade2e Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 22 Oct 2024 20:53:03 +0000 Subject: [PATCH] crosscluster/logical: check UDT equivalency during LDR creation This check requires that the logical and physical representations of each type are identical. In the future, we may investigate ways to only require logical equivalency. Release note (ops change): When creating a logical replication stream, any user-defined types in the source and destination are now checked for equivalency. This allows for creating a stream that handles user-defined types without needing to use the `WITH SKIP SCHEMA CHECK` option as long as the replication stream uses `mode = immediate`. --- pkg/ccl/crosscluster/logical/BUILD.bazel | 1 + .../create_logical_replication_stmt.go | 21 +++++- .../logical/logical_replication_job.go | 1 + .../logical/logical_replication_job_test.go | 36 +++++++--- .../streamclient/partitioned_stream_client.go | 4 +- .../tabledesc/logical_replication_helpers.go | 70 +++++++++++++++++-- 6 files changed, 113 insertions(+), 20 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index 42b6731903a4..a38ef38dd28a 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -74,6 +74,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/ctxgroup", + "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 81713260cb3c..6bec89dbddce 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" + "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -208,6 +210,19 @@ func createLogicalReplicationStreamPlanHook( return err } + sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) + for i, desc := range spec.TypeDescriptors { + // Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved, + // we cannot allow user-defined types on the SQL ingestion path. + if m, ok := options.GetMode(); ok && m != "immediate" { + return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types") + } + sourceTypes[i] = &desc + } + // TODO(rafi): do we need a different type resolver? + // See https://github.com/cockroachdb/cockroach/issues/132164. + importResolver := importer.MakeImportTypeResolver(sourceTypes) + // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that // is used for filtering; if not, they probably forgot that step. @@ -215,7 +230,11 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] - srcTableDescs[i] = &td + cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + return err + } + srcTableDescs[i] = cpy.TableDesc() repPairs[i].SrcDescriptorID = int32(td.ID) if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication { throwNoTTLWithCDCIgnoreError = false diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 9f29c615ff1a..1a007cae705e 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -360,6 +360,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl( } // TODO(msbutler): is this import type resolver kosher? Should put in a new package. + // See https://github.com/cockroachdb/cockroach/issues/132164. importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index 396fb35047af..3f7d92612e02 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1953,20 +1953,22 @@ func TestUserDefinedTypes(t *testing.T) { // Create the same user-defined type both tables. dbA.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") dbB.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") + dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") + dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") - dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") - dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") + dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - dbB.Exec(t, "INSERT INTO data VALUES (1, 'one')") + dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") // Force default expression evaluation. - dbB.Exec(t, "INSERT INTO data VALUES (2)") + dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") var jobAID jobspb.JobID - dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data with skip schema check", dbBURL.String()).Scan(&jobAID) + dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID) WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) - dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) - dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) + dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) } // TestLogicalReplicationCreationChecks verifies that we check that the table @@ -2128,14 +2130,14 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), ) - // Verify that the stream cannot be created with user defined types. + // Verify that the stream cannot be created with mismatched enum types. dbA.Exec(t, "DROP TRIGGER my_trigger ON tab") dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')") - dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')") + dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b')") dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL") dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN enum_col b.mytype NOT NULL") dbA.ExpectErr(t, - `cannot create logical replication stream: destination table tab column enum_col has user-defined type USER DEFINED ENUM: public.mytype`, + `cannot create logical replication stream: .* destination type USER DEFINED ENUM: public.mytype has logical representations \[a b c\], but the source type USER DEFINED ENUM: mytype has \[a b\]`, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), ) // Allows user to create LDR stream with UDT via SKIP SCHEMA CHECK. @@ -2146,9 +2148,21 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { dbA.Exec(t, "CANCEL JOB $1", jobIDSkipSchemaCheck) jobutils.WaitForJobToCancel(t, dbA, jobIDSkipSchemaCheck) - // Check that UNIQUE indexes match. + // Verify that the stream cannot be created with mismatched composite types. dbA.Exec(t, "ALTER TABLE tab DROP COLUMN enum_col") dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN enum_col") + dbA.Exec(t, "CREATE TYPE composite_typ AS (a INT, b TEXT)") + dbB.Exec(t, "CREATE TYPE b.composite_typ AS (a TEXT, b INT)") + dbA.Exec(t, "ALTER TABLE tab ADD COLUMN composite_udt_col composite_typ NOT NULL") + dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN composite_udt_col b.composite_typ NOT NULL") + dbA.ExpectErr(t, + `cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING`, + "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), + ) + + // Check that UNIQUE indexes match. + dbA.Exec(t, "ALTER TABLE tab DROP COLUMN composite_udt_col") + dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN composite_udt_col") dbA.Exec(t, "CREATE UNIQUE INDEX payload_idx ON tab(payload)") dbB.Exec(t, "CREATE UNIQUE INDEX multi_idx ON b.tab(composite_col, pk)") dbA.ExpectErr(t, diff --git a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go index 529d8408527d..d4ba69a4d9d2 100644 --- a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go @@ -343,8 +343,8 @@ func (p *partitionedStreamClient) PlanLogicalReplication( } sourceTypes := make([]*descpb.TypeDescriptor, len(streamSpec.TypeDescriptors)) - for _, desc := range streamSpec.TypeDescriptors { - sourceTypes = append(sourceTypes, &desc) + for i, desc := range streamSpec.TypeDescriptors { + sourceTypes[i] = &desc } return LogicalReplicationPlan{ diff --git a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go index 8e0928b88956..75a48d0627b8 100644 --- a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go +++ b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go @@ -6,6 +6,7 @@ package tabledesc import ( + "bytes" "cmp" "slices" "strings" @@ -14,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -203,20 +205,76 @@ func checkSrcDstColsMatch(src *descpb.TableDescriptor, dst *descpb.TableDescript ) } - if dstCol.Type.UserDefined() { + if err := checkTypesMatch(srcCol.Type, dstCol.Type); err != nil { + return errors.Wrapf(err, + "destination table %s column %s has type %s, but the source table %s has type %s", + dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + ) + } + } + return nil +} + +// checkTypesMatch checks that the source and destination types match. Enums +// need to be equal in both physical and logical representations. +func checkTypesMatch(srcTyp *types.T, dstTyp *types.T) error { + switch { + case dstTyp.TypeMeta.EnumData != nil: + if srcTyp.TypeMeta.EnumData == nil { + return errors.Newf( + "destination type %s is an ENUM, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if !slices.Equal(srcTyp.TypeMeta.EnumData.LogicalRepresentations, dstTyp.TypeMeta.EnumData.LogicalRepresentations) { + return errors.Newf( + "destination type %s has logical representations %v, but the source type %s has %v", + dstTyp.SQLStringForError(), dstTyp.TypeMeta.EnumData.LogicalRepresentations, + srcTyp.SQLStringForError(), srcTyp.TypeMeta.EnumData.LogicalRepresentations, + ) + } + if !slices.EqualFunc( + srcTyp.TypeMeta.EnumData.PhysicalRepresentations, dstTyp.TypeMeta.EnumData.PhysicalRepresentations, + func(x, y []byte) bool { return bytes.Equal(x, y) }, + ) { return errors.Newf( - "destination table %s column %s has user-defined type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), + "destination type %s and source type %s have mismatched physical representations", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } - if !srcCol.Type.Identical(dstCol.Type) { + case len(dstTyp.TupleContents()) > 0: + if len(srcTyp.TupleContents()) == 0 { return errors.Newf( - "destination table %s column %s has type %s, but the source table %s has type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + "destination type %s is a tuple, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if len(dstTyp.TupleContents()) != len(srcTyp.TupleContents()) { + return errors.Newf( + "destination type %s has %d tuple elements, but the source type %s has %d tuple elements", + dstTyp.SQLStringForError(), len(dstTyp.TupleContents()), + srcTyp.SQLStringForError(), len(srcTyp.TupleContents()), + ) + } + for i := range dstTyp.TupleContents() { + if err := checkTypesMatch(srcTyp.TupleContents()[i], dstTyp.TupleContents()[i]); err != nil { + return errors.Wrapf(err, + "destination type %s tuple element %d does not match source type %s tuple element %d", + dstTyp.SQLStringForError(), i, srcTyp.SQLStringForError(), i, + ) + } + } + + default: + if !srcTyp.Identical(dstTyp) { + return errors.Newf( + "destination type %s does not match source type %s", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } } + return nil }