From 061985812eb2282218b261d330721fe40be33b7c Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 22 Oct 2024 16:53:03 -0400 Subject: [PATCH] crosscluster/logical: check UDT equivalency during LDR creation This check requires that the logical and physical representations of each type are identical. In the future, we may investigate ways to only require logical equivalency. Release note (ops change): When creating a logical replication stream, any user-defined types in the source and destination are now checked for equivalency. This allows for creating a stream that handles user-defined types without needing to use the `WITH SKIP SCHEMA CHECK` option. --- .../create_logical_replication_stmt.go | 15 +++- .../logical/logical_replication_job.go | 1 + .../logical/logical_replication_job_test.go | 16 +++-- .../streamclient/partitioned_stream_client.go | 4 +- .../tabledesc/logical_replication_helpers.go | 70 +++++++++++++++++-- 5 files changed, 90 insertions(+), 16 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 81713260cb3c..3920e019fcca 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" + "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -208,6 +209,14 @@ func createLogicalReplicationStreamPlanHook( return err } + sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) + for i, desc := range spec.TypeDescriptors { + sourceTypes[i] = &desc + } + // TODO(rafi): do we need a different type resolver? + // See https://github.com/cockroachdb/cockroach/issues/132164. + importResolver := importer.MakeImportTypeResolver(sourceTypes) + // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that // is used for filtering; if not, they probably forgot that step. @@ -215,7 +224,11 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] - srcTableDescs[i] = &td + cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + return err + } + srcTableDescs[i] = cpy.TableDesc() repPairs[i].SrcDescriptorID = int32(td.ID) if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication { throwNoTTLWithCDCIgnoreError = false diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 9f29c615ff1a..1a007cae705e 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -360,6 +360,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl( } // TODO(msbutler): is this import type resolver kosher? Should put in a new package. + // See https://github.com/cockroachdb/cockroach/issues/132164. importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index e04cf3cadea5..434e08ce1d8a 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1925,20 +1925,22 @@ func TestUserDefinedTypes(t *testing.T) { // Create the same user-defined type both tables. dbA.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") dbB.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')") + dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") + dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") - dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") - dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')") + dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - dbB.Exec(t, "INSERT INTO data VALUES (1, 'one')") + dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") // Force default expression evaluation. - dbB.Exec(t, "INSERT INTO data VALUES (2)") + dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") var jobAID jobspb.JobID - dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data with skip schema check", dbBURL.String()).Scan(&jobAID) + dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID) WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) - dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) - dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}}) + dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) } // TestLogicalReplicationCreationChecks verifies that we check that the table diff --git a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go index 529d8408527d..d4ba69a4d9d2 100644 --- a/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go @@ -343,8 +343,8 @@ func (p *partitionedStreamClient) PlanLogicalReplication( } sourceTypes := make([]*descpb.TypeDescriptor, len(streamSpec.TypeDescriptors)) - for _, desc := range streamSpec.TypeDescriptors { - sourceTypes = append(sourceTypes, &desc) + for i, desc := range streamSpec.TypeDescriptors { + sourceTypes[i] = &desc } return LogicalReplicationPlan{ diff --git a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go index 8e0928b88956..75a48d0627b8 100644 --- a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go +++ b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go @@ -6,6 +6,7 @@ package tabledesc import ( + "bytes" "cmp" "slices" "strings" @@ -14,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -203,20 +205,76 @@ func checkSrcDstColsMatch(src *descpb.TableDescriptor, dst *descpb.TableDescript ) } - if dstCol.Type.UserDefined() { + if err := checkTypesMatch(srcCol.Type, dstCol.Type); err != nil { + return errors.Wrapf(err, + "destination table %s column %s has type %s, but the source table %s has type %s", + dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + ) + } + } + return nil +} + +// checkTypesMatch checks that the source and destination types match. Enums +// need to be equal in both physical and logical representations. +func checkTypesMatch(srcTyp *types.T, dstTyp *types.T) error { + switch { + case dstTyp.TypeMeta.EnumData != nil: + if srcTyp.TypeMeta.EnumData == nil { + return errors.Newf( + "destination type %s is an ENUM, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if !slices.Equal(srcTyp.TypeMeta.EnumData.LogicalRepresentations, dstTyp.TypeMeta.EnumData.LogicalRepresentations) { + return errors.Newf( + "destination type %s has logical representations %v, but the source type %s has %v", + dstTyp.SQLStringForError(), dstTyp.TypeMeta.EnumData.LogicalRepresentations, + srcTyp.SQLStringForError(), srcTyp.TypeMeta.EnumData.LogicalRepresentations, + ) + } + if !slices.EqualFunc( + srcTyp.TypeMeta.EnumData.PhysicalRepresentations, dstTyp.TypeMeta.EnumData.PhysicalRepresentations, + func(x, y []byte) bool { return bytes.Equal(x, y) }, + ) { return errors.Newf( - "destination table %s column %s has user-defined type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), + "destination type %s and source type %s have mismatched physical representations", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } - if !srcCol.Type.Identical(dstCol.Type) { + case len(dstTyp.TupleContents()) > 0: + if len(srcTyp.TupleContents()) == 0 { return errors.Newf( - "destination table %s column %s has type %s, but the source table %s has type %s", - dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(), + "destination type %s is a tuple, but the source type %s is not", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), + ) + } + if len(dstTyp.TupleContents()) != len(srcTyp.TupleContents()) { + return errors.Newf( + "destination type %s has %d tuple elements, but the source type %s has %d tuple elements", + dstTyp.SQLStringForError(), len(dstTyp.TupleContents()), + srcTyp.SQLStringForError(), len(srcTyp.TupleContents()), + ) + } + for i := range dstTyp.TupleContents() { + if err := checkTypesMatch(srcTyp.TupleContents()[i], dstTyp.TupleContents()[i]); err != nil { + return errors.Wrapf(err, + "destination type %s tuple element %d does not match source type %s tuple element %d", + dstTyp.SQLStringForError(), i, srcTyp.SQLStringForError(), i, + ) + } + } + + default: + if !srcTyp.Identical(dstTyp) { + return errors.Newf( + "destination type %s does not match source type %s", + dstTyp.SQLStringForError(), srcTyp.SQLStringForError(), ) } } + return nil }