Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.3: tabledesc: disallow LDR for tables that reference UDFs/sequences #133090

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
`cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)
// Allos user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
// Allow user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
var jobIDSkipSchemaCheck jobspb.JobID
dbA.QueryRow(t,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH SKIP SCHEMA CHECK",
Expand All @@ -2097,7 +2097,39 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

// Check if the table references a UDF.
dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_udf() RETURNS INT AS $$ SELECT 1 $$ LANGUAGE SQL")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL")
dbA.Exec(t, "ALTER TABLE tab ALTER COLUMN udf_col SET DEFAULT my_udf()")
dbB.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL DEFAULT 1")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references functions with IDs \[[0-9]+\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Check if the table references a sequence.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col")
dbB.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col")
dbA.Exec(t, "CREATE SEQUENCE my_seq")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT nextval('my_seq')")
dbB.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT 1")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references sequences with IDs \[[0-9]+\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Check if table has a trigger.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col")
dbB.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col")
dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_trigger() RETURNS TRIGGER AS $$ BEGIN RETURN NEW; END $$ LANGUAGE PLPGSQL")
dbA.Exec(t, "CREATE TRIGGER my_trigger BEFORE INSERT ON tab FOR EACH ROW EXECUTE FUNCTION my_trigger()")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references triggers \[my_trigger\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Verify that the stream cannot be created with user defined types.
dbA.Exec(t, "DROP TRIGGER my_trigger ON tab")
dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')")
dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL")
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/catalog/tabledesc/logical_replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,31 @@ func CheckLogicalReplicationCompatibility(
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
}
}
if err := checkOutboundReferences(dst); err != nil {
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
}

return nil
}

// checkOutboundReferences verifies that the table descriptor does not
// reference any user-defined functions, sequences, or triggers.
func checkOutboundReferences(dst *descpb.TableDescriptor) error {
for _, col := range dst.Columns {
if len(col.UsesSequenceIds) > 0 {
return errors.Newf("table %s references sequences with IDs %v", dst.Name, col.UsesSequenceIds)
}
if len(col.UsesFunctionIds) > 0 {
return errors.Newf("table %s references functions with IDs %v", dst.Name, col.UsesFunctionIds)
}
}
if len(dst.Triggers) > 0 {
triggerNames := make([]string, len(dst.Triggers))
for i, trigger := range dst.Triggers {
triggerNames[i] = trigger.Name
}
return errors.Newf("table %s references triggers [%s]", dst.Name, strings.Join(triggerNames, ", "))
}
return nil
}

Expand Down