diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index 42b6731903a4..a38ef38dd28a 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -74,6 +74,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/ctxgroup", + "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 81713260cb3c..6bec89dbddce 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" + "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -208,6 +210,19 @@ func createLogicalReplicationStreamPlanHook( return err } + sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) + for i, desc := range spec.TypeDescriptors { + // Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved, + // we cannot allow user-defined types on the SQL ingestion path. + if m, ok := options.GetMode(); ok && m != "immediate" { + return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types") + } + sourceTypes[i] = &desc + } + // TODO(rafi): do we need a different type resolver? + // See https://github.com/cockroachdb/cockroach/issues/132164. + importResolver := importer.MakeImportTypeResolver(sourceTypes) + // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that // is used for filtering; if not, they probably forgot that step. @@ -215,7 +230,11 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] - srcTableDescs[i] = &td + cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + return err + } + srcTableDescs[i] = cpy.TableDesc() repPairs[i].SrcDescriptorID = int32(td.ID) if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication { throwNoTTLWithCDCIgnoreError = false diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 9f29c615ff1a..1a007cae705e 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -360,6 +360,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl( } // TODO(msbutler): is this import type resolver kosher? Should put in a new package. + // See https://github.com/cockroachdb/cockroach/issues/132164. importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index f997729bf169..0d16a44fbbf9 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1953,20 +1953,22 @@ func TestUserDefinedTypes(t *testing.T) { // Create the same user-defined type both tables. dbA.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") dbB.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") + dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") + dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") - dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") - dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") + dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - dbB.Exec(t, "INSERT INTO data VALUES (1, 'one')") + dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") // Force default expression evaluation. - dbB.Exec(t, "INSERT INTO data VALUES (2)") + dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") var jobAID jobspb.JobID - dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data with skip schema check", dbBURL.String()).Scan(&jobAID) + dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID) WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) - dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) - dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) + dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) } // TestLogicalReplicationCreationChecks verifies that we check that the table @@ -2128,14 +2130,14 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), ) - // Verify that the stream cannot be created with user defined types. + // Verify that the stream cannot be created with mismatched enum types. dbA.Exec(t, "DROP TRIGGER my_trigger ON tab") dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')") - dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')") + dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b')") dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL") dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN enum_col b.mytype NOT NULL") dbA.ExpectErr(t, - `cannot create logical replication stream: destination table tab column enum_col has user-defined type USER DEFINED ENUM: public.mytype`, + `cannot create logical replication stream: .* destination type USER DEFINED ENUM: public.mytype has logical representations \[a b c\], but the source type USER DEFINED ENUM: mytype has \[a b\]`, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), ) // Allows user to create LDR stream with UDT via SKIP SCHEMA CHECK. @@ -2146,9 +2148,21 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { dbA.Exec(t, "CANCEL JOB $1", jobIDSkipSchemaCheck) jobutils.WaitForJobToCancel(t, dbA, jobIDSkipSchemaCheck) - // Check that UNIQUE indexes match. + // Verify that the stream cannot be created with mismatched composite types. dbA.Exec(t, "ALTER TABLE tab DROP COLUMN enum_col") dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN enum_col") + dbA.Exec(t, "CREATE TYPE composite_typ AS (a INT, b TEXT)") + dbB.Exec(t, "CREATE TYPE b.composite_typ AS (a TEXT, b INT)") + dbA.Exec(t, "ALTER TABLE tab ADD COLUMN composite_udt_col composite_typ NOT NULL") + dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN composite_udt_col b.composite_typ NOT NULL") + dbA.ExpectErr(t, + `cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING`, + "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), + ) + + // Check that UNIQUE indexes match. + dbA.Exec(t, "ALTER TABLE tab DROP COLUMN composite_udt_col") + dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN composite_udt_col") dbA.Exec(t, "CREATE UNIQUE INDEX payload_idx ON tab(payload)") dbB.Exec(t, "CREATE UNIQUE INDEX multi_idx ON b.tab(composite_col, pk)") dbA.ExpectErr(t, diff --git a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go index 529d8408527d..d4ba69a4d9d2 100644 --- a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go @@ -343,8 +343,8 @@ func (p *partitionedStreamClient) PlanLogicalReplication( } sourceTypes := make([]*descpb.TypeDescriptor, len(streamSpec.TypeDescriptors)) - for _, desc := range streamSpec.TypeDescriptors { - sourceTypes = append(sourceTypes, &desc) + for i, desc := range streamSpec.TypeDescriptors { + sourceTypes[i] = &desc } return LogicalReplicationPlan{ diff --git a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go index 8e0928b88956..75a48d0627b8 100644 --- a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go +++ b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go @@ -6,6 +6,7 @@ package tabledesc import ( + "bytes" "cmp" "slices" "strings" @@ -14,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -203,20 +205,76 @@ func checkSrcDstColsMatch(src *descpb.TableDescriptor, dst *descpb.TableDescript ) } - if dstCol.Type.UserDefined() { + if err := checkTypesMatch(srcCol.Type, dstCol.Type); err != nil { + return errors.Wrapf(err, + "destination table %s column %s has type %s, but the source table %s has type %s", + dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + ) + } + } + return nil +} + +// checkTypesMatch checks that the source and destination types match. Enums +// need to be equal in both physical and logical representations. +func checkTypesMatch(srcTyp *types.T, dstTyp *types.T) error { + switch { + case dstTyp.TypeMeta.EnumData != nil: + if srcTyp.TypeMeta.EnumData == nil { + return errors.Newf( + "destination type %s is an ENUM, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if !slices.Equal(srcTyp.TypeMeta.EnumData.LogicalRepresentations, dstTyp.TypeMeta.EnumData.LogicalRepresentations) { + return errors.Newf( + "destination type %s has logical representations %v, but the source type %s has %v", + dstTyp.SQLStringForError(), dstTyp.TypeMeta.EnumData.LogicalRepresentations, + srcTyp.SQLStringForError(), srcTyp.TypeMeta.EnumData.LogicalRepresentations, + ) + } + if !slices.EqualFunc( + srcTyp.TypeMeta.EnumData.PhysicalRepresentations, dstTyp.TypeMeta.EnumData.PhysicalRepresentations, + func(x, y []byte) bool { return bytes.Equal(x, y) }, + ) { return errors.Newf( - "destination table %s column %s has user-defined type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), + "destination type %s and source type %s have mismatched physical representations", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } - if !srcCol.Type.Identical(dstCol.Type) { + case len(dstTyp.TupleContents()) > 0: + if len(srcTyp.TupleContents()) == 0 { return errors.Newf( - "destination table %s column %s has type %s, but the source table %s has type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + "destination type %s is a tuple, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if len(dstTyp.TupleContents()) != len(srcTyp.TupleContents()) { + return errors.Newf( + "destination type %s has %d tuple elements, but the source type %s has %d tuple elements", + dstTyp.SQLStringForError(), len(dstTyp.TupleContents()), + srcTyp.SQLStringForError(), len(srcTyp.TupleContents()), + ) + } + for i := range dstTyp.TupleContents() { + if err := checkTypesMatch(srcTyp.TupleContents()[i], dstTyp.TupleContents()[i]); err != nil { + return errors.Wrapf(err, + "destination type %s tuple element %d does not match source type %s tuple element %d", + dstTyp.SQLStringForError(), i, srcTyp.SQLStringForError(), i, + ) + } + } + + default: + if !srcTyp.Identical(dstTyp) { + return errors.Newf( + "destination type %s does not match source type %s", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } } + return nil }