Skip to content

Commit

Permalink
Merge #133079
Browse files Browse the repository at this point in the history
133079: tabledesc: disallow LDR for tables that reference UDFs/sequences r=rafiss a=rafiss

We will disallow these kinds of references until we have explicit tests for them.

fixes #132304
Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Oct 21, 2024
2 parents beb7026 + 9e02676 commit 4a5e4d2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
34 changes: 33 additions & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
`cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)
// Allos user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
// Allow user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
var jobIDSkipSchemaCheck jobspb.JobID
dbA.QueryRow(t,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH SKIP SCHEMA CHECK",
Expand All @@ -2097,7 +2097,39 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

// Check if the table references a UDF.
dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_udf() RETURNS INT AS $$ SELECT 1 $$ LANGUAGE SQL")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL")
dbA.Exec(t, "ALTER TABLE tab ALTER COLUMN udf_col SET DEFAULT my_udf()")
dbB.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL DEFAULT 1")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references functions with IDs \[[0-9]+\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Check if the table references a sequence.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col")
dbB.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col")
dbA.Exec(t, "CREATE SEQUENCE my_seq")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT nextval('my_seq')")
dbB.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT 1")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references sequences with IDs \[[0-9]+\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Check if table has a trigger.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col")
dbB.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col")
dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_trigger() RETURNS TRIGGER AS $$ BEGIN RETURN NEW; END $$ LANGUAGE PLPGSQL")
dbA.Exec(t, "CREATE TRIGGER my_trigger BEFORE INSERT ON tab FOR EACH ROW EXECUTE FUNCTION my_trigger()")
dbA.ExpectErr(t,
`cannot create logical replication stream: table tab references triggers \[my_trigger\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Verify that the stream cannot be created with user defined types.
dbA.Exec(t, "DROP TRIGGER my_trigger ON tab")
dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')")
dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL")
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/catalog/tabledesc/logical_replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,31 @@ func CheckLogicalReplicationCompatibility(
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
}
}
if err := checkOutboundReferences(dst); err != nil {
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
}

return nil
}

// checkOutboundReferences verifies that the table descriptor does not
// reference any user-defined functions, sequences, or triggers.
func checkOutboundReferences(dst *descpb.TableDescriptor) error {
for _, col := range dst.Columns {
if len(col.UsesSequenceIds) > 0 {
return errors.Newf("table %s references sequences with IDs %v", dst.Name, col.UsesSequenceIds)
}
if len(col.UsesFunctionIds) > 0 {
return errors.Newf("table %s references functions with IDs %v", dst.Name, col.UsesFunctionIds)
}
}
if len(dst.Triggers) > 0 {
triggerNames := make([]string, len(dst.Triggers))
for i, trigger := range dst.Triggers {
triggerNames[i] = trigger.Name
}
return errors.Newf("table %s references triggers [%s]", dst.Name, strings.Join(triggerNames, ", "))
}
return nil
}

Expand Down

0 comments on commit 4a5e4d2

Please sign in to comment.