Skip to content

Commit

Permalink
crosscluster/logical: check UDT equivalency during LDR creation
Browse files Browse the repository at this point in the history
This check requires that the logical and physical representations of
each type are identical. In the future, we may investigate ways to only
require logical equivalency.

Release note (ops change): When creating a logical replication stream,
any user-defined types in the source and destination are now checked for
equivalency. This allows for creating a stream that handles user-defined
types without needing to use the `WITH SKIP SCHEMA CHECK` option.
  • Loading branch information
rafiss committed Oct 22, 2024
1 parent 5eb9caf commit 0619858
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 16 deletions.
15 changes: 14 additions & 1 deletion pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -208,14 +209,26 @@ func createLogicalReplicationStreamPlanHook(
return err
}

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
// is used for filtering; if not, they probably forgot that step.
throwNoTTLWithCDCIgnoreError := discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes

for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
srcTableDescs[i] = &td
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
repPairs[i].SrcDescriptorID = int32(td.ID)
if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication {
throwNoTTLWithCDCIgnoreError = false
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,20 +1925,22 @@ func TestUserDefinedTypes(t *testing.T) {
// Create the same user-defined type both tables.
dbA.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')")
dbB.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')")
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')")
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one')")
dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data VALUES (2)")
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data with skip schema check", dbBURL.String()).Scan(&jobAID)
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}})
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (p *partitionedStreamClient) PlanLogicalReplication(
}

sourceTypes := make([]*descpb.TypeDescriptor, len(streamSpec.TypeDescriptors))
for _, desc := range streamSpec.TypeDescriptors {
sourceTypes = append(sourceTypes, &desc)
for i, desc := range streamSpec.TypeDescriptors {
sourceTypes[i] = &desc
}

return LogicalReplicationPlan{
Expand Down
70 changes: 64 additions & 6 deletions pkg/sql/catalog/tabledesc/logical_replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tabledesc

import (
"bytes"
"cmp"
"slices"
"strings"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -203,20 +205,76 @@ func checkSrcDstColsMatch(src *descpb.TableDescriptor, dst *descpb.TableDescript
)
}

if dstCol.Type.UserDefined() {
if err := checkTypesMatch(srcCol.Type, dstCol.Type); err != nil {
return errors.Wrapf(err,
"destination table %s column %s has type %s, but the source table %s has type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(),
)
}
}
return nil
}

// checkTypesMatch checks that the source and destination types match. Enums
// need to be equal in both physical and logical representations.
func checkTypesMatch(srcTyp *types.T, dstTyp *types.T) error {
switch {
case dstTyp.TypeMeta.EnumData != nil:
if srcTyp.TypeMeta.EnumData == nil {
return errors.Newf(
"destination type %s is an ENUM, but the source type %s is not",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
if !slices.Equal(srcTyp.TypeMeta.EnumData.LogicalRepresentations, dstTyp.TypeMeta.EnumData.LogicalRepresentations) {
return errors.Newf(
"destination type %s has logical representations %v, but the source type %s has %v",
dstTyp.SQLStringForError(), dstTyp.TypeMeta.EnumData.LogicalRepresentations,
srcTyp.SQLStringForError(), srcTyp.TypeMeta.EnumData.LogicalRepresentations,
)
}
if !slices.EqualFunc(
srcTyp.TypeMeta.EnumData.PhysicalRepresentations, dstTyp.TypeMeta.EnumData.PhysicalRepresentations,
func(x, y []byte) bool { return bytes.Equal(x, y) },
) {
return errors.Newf(
"destination table %s column %s has user-defined type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(),
"destination type %s and source type %s have mismatched physical representations",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}

if !srcCol.Type.Identical(dstCol.Type) {
case len(dstTyp.TupleContents()) > 0:
if len(srcTyp.TupleContents()) == 0 {
return errors.Newf(
"destination table %s column %s has type %s, but the source table %s has type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(),
"destination type %s is a tuple, but the source type %s is not",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
if len(dstTyp.TupleContents()) != len(srcTyp.TupleContents()) {
return errors.Newf(
"destination type %s has %d tuple elements, but the source type %s has %d tuple elements",
dstTyp.SQLStringForError(), len(dstTyp.TupleContents()),
srcTyp.SQLStringForError(), len(srcTyp.TupleContents()),
)
}
for i := range dstTyp.TupleContents() {
if err := checkTypesMatch(srcTyp.TupleContents()[i], dstTyp.TupleContents()[i]); err != nil {
return errors.Wrapf(err,
"destination type %s tuple element %d does not match source type %s tuple element %d",
dstTyp.SQLStringForError(), i, srcTyp.SQLStringForError(), i,
)
}
}

default:
if !srcTyp.Identical(dstTyp) {
return errors.Newf(
"destination type %s does not match source type %s",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
}

return nil
}

Expand Down

0 comments on commit 0619858

Please sign in to comment.