Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow append only table with pk #18634

Merged
merged 13 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion e2e_test/streaming/on_conflict.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int, v3 int, primary key(v1)) on conflict ignore;
create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict ignore;

statement ok
insert into t1 values (1,4,2), (2,3,3);
Expand All @@ -26,6 +26,9 @@ select v1, v2, v3 from mv1;
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement error
create table t2 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict overwrite;

statement ok
create table t2 (v1 int, v2 int, v3 int, primary key(v1)) on conflict overwrite;

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@
explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
31 changes: 31 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,34 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: t, columns: [v1, v2, _row_id] }
└─StreamExchange [no_shuffle] { dist: SomeShard }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: Overwrite }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(v2) }
│ └─StreamSource { source: t, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v2) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: IgnoreConflict }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(v2) }
│ └─StreamSource { source: t, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v2) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
20 changes: 14 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,20 @@ fn gen_table_plan_inner(
vec![],
);

if append_only && row_id_index.is_none() {
return Err(ErrorCode::InvalidInputSyntax(
"PRIMARY KEY constraint can not be applied to an append-only table.".to_owned(),
)
.into());
}
let pk_on_append_only = append_only && row_id_index.is_none();

let on_conflict = if pk_on_append_only {
let on_conflict = on_conflict.unwrap_or(OnConflict::Ignore);
if on_conflict != OnConflict::Ignore {
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be IGNORE.".to_owned(),
)
.into());
}
Some(on_conflict)
} else {
on_conflict
};

if !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
Expand Down
25 changes: 14 additions & 11 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ impl PlanRoot {
#[derive(PartialEq, Debug, Copy, Clone)]
enum PrimaryKeyKind {
UserDefinedPrimaryKey,
RowIdAsPrimaryKey,
AppendOnly,
NonAppendOnlyRowIdPK,
AppendOnlyRowIdPK,
}

fn inject_dml_node(
Expand All @@ -693,25 +693,28 @@ impl PlanRoot {
dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;

dml_node = match kind {
PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => {
PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPK => {
RequiredDist::hash_shard(pk_column_indices)
.enforce_if_not_satisfies(dml_node, &Order::any())?
}
PrimaryKeyKind::AppendOnly => StreamExchange::new_no_shuffle(dml_node).into(),
PrimaryKeyKind::AppendOnlyRowIdPK => {
StreamExchange::new_no_shuffle(dml_node).into()
}
};

Ok(dml_node)
}

let kind = if append_only {
assert!(row_id_index.is_some());
PrimaryKeyKind::AppendOnly
} else if let Some(row_id_index) = row_id_index {
let kind = if let Some(row_id_index) = row_id_index {
assert_eq!(
pk_column_indices.iter().exactly_one().copied().unwrap(),
row_id_index
);
PrimaryKeyKind::RowIdAsPrimaryKey
if append_only {
PrimaryKeyKind::AppendOnlyRowIdPK
} else {
PrimaryKeyKind::NonAppendOnlyRowIdPK
}
} else {
PrimaryKeyKind::UserDefinedPrimaryKey
};
Expand All @@ -738,7 +741,7 @@ impl PlanRoot {
.enforce_if_not_satisfies(external_source_node, &Order::any())?
}

PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => {
PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => {
StreamExchange::new_no_shuffle(external_source_node).into()
}
};
Expand Down Expand Up @@ -814,7 +817,7 @@ impl PlanRoot {
PrimaryKeyKind::UserDefinedPrimaryKey => {
unreachable!()
}
PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => {
PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => {
stream_plan = StreamRowIdGen::new_with_dist(
stream_plan,
row_id_index,
Expand Down
Loading