Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow generated column in cdc table #19112

Merged
merged 17 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ psql -c "
statement ok
create table tt1 (v1 int,
v2 timestamptz,
v3 int as v1 + 1,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
Expand All @@ -28,14 +29,15 @@ sleep 3s
query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00
1 2023-10-23 10:00:00+00:00 2

statement ok
drop table tt1;

statement ok
create table tt1 (v1 int,
v2 timestamptz,
v3 int as v1 + 2,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
Expand All @@ -54,7 +56,7 @@ sleep 3s
query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00
1 2023-10-23 10:00:00+00:00 3

statement ok
drop table tt1;
4 changes: 4 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ impl ColumnCatalog {
}
}

pub fn is_rw_sys_column(&self) -> bool {
self.column_desc.system_column.is_some()
}

pub fn rw_timestamp_column() -> Self {
Self {
column_desc: rw_timestamp_column_desc(),
Expand Down
23 changes: 9 additions & 14 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,12 +847,19 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(

let (options, secret_refs) = cdc_with_options.into_parts();

let non_generated_column_descs = columns
.iter()
.filter(|&c| (!c.is_generated()))
.map(|c| c.column_desc.clone())
.collect_vec();
let non_generated_column_num = non_generated_column_descs.len();

let cdc_table_desc = CdcTableDesc {
table_id,
source_id: source.id.into(), // id of cdc source streaming job
external_table_name: external_table_name.clone(),
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
columns: non_generated_column_descs,
stream_key: pk_column_indices,
connect_properties: options,
secret_refs,
Expand All @@ -870,7 +877,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
);

let scan_node: PlanRef = logical_scan.into();
let required_cols = FixedBitSet::with_capacity(columns.len());
let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
let plan_root = PlanRoot::new_with_logical_plan(
scan_node,
RequiredDist::Any,
Expand Down Expand Up @@ -1157,18 +1164,6 @@ fn sanity_check_for_cdc_table(
constraints: &Vec<TableConstraint>,
source_watermarks: &Vec<SourceWatermark>,
) -> Result<()> {
for c in column_defs {
for op in &c.options {
if let ColumnOption::GeneratedColumns(_) = op.option {
return Err(ErrorCode::NotSupported(
"generated column defined on the table created from a CDC source".into(),
"Remove the generated column in the column list".into(),
)
.into());
}
}
}

// wildcard cannot be used with column definitions
if wildcard_idx.is_some() && !column_defs.is_empty() {
return Err(ErrorCode::NotSupported(
Expand Down
32 changes: 30 additions & 2 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,30 @@ impl DdlService for DdlServiceImpl {
};

for table_change in schema_change.table_changes {
for c in &table_change.columns {
let c = ColumnCatalog::from(c.clone());

let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
tracing::warn!(target: "auto_schema_change",
cdc_table_id = table_change.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
"invalid column type from cdc table change");
Err(Status::invalid_argument(format!(
"invalid column type: {} from cdc table change, column: {:?}",
column_type, c
)))
};
if c.is_generated() {
return invalid_col_type("generated column", &c);
}
if c.is_rw_sys_column() {
return invalid_col_type("rw system column", &c);
}
if c.is_hidden {
return invalid_col_type("hidden column", &c);
}
}

// get the table catalog corresponding to the cdc table
let tables: Vec<Table> = self
.metadata_manager
Expand All @@ -944,10 +968,14 @@ impl DdlService for DdlServiceImpl {
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
// is a subset of the other.
let original_columns: HashSet<(String, DataType)> =
HashSet::from_iter(table.columns.iter().map(|col| {
HashSet::from_iter(table.columns.iter().filter_map(|col| {
let col = ColumnCatalog::from(col.clone());
let data_type = col.data_type().clone();
(col.column_desc.name, data_type)
if col.is_generated() {
None
} else {
Some((col.column_desc.name, data_type))
}
}));
let new_columns: HashSet<(String, DataType)> =
HashSet::from_iter(table_change.columns.iter().map(|col| {
Expand Down
Loading