Skip to content

Commit

Permalink
change column id gen
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Jun 13, 2023
1 parent 9506599 commit 16bde5c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
5 changes: 5 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ impl ColumnId {
pub const fn new(column_id: i32) -> Self {
Self(column_id)
}

/// Sometimes the id field is filled later, we use this value for better debugging.
pub const fn placeholder() -> Self {
Self(i32::MAX - 1)
}
}

impl ColumnId {
Expand Down
12 changes: 7 additions & 5 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,12 @@ pub(crate) async fn resolve_source_schema(
fn check_and_add_timestamp_column(
with_properties: &HashMap<String, String>,
columns: &mut Vec<ColumnCatalog>,
col_id_gen: &mut ColumnIdGenerator,
) {
if is_kafka_connector(with_properties) {
let kafka_timestamp_column = ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Timestamptz,
column_id: col_id_gen.generate(KAFKA_TIMESTAMP_COLUMN_NAME),
column_id: ColumnId::placeholder(),
name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(),
field_descs: vec![],
type_name: "".to_string(),
Expand Down Expand Up @@ -763,11 +762,14 @@ pub async fn handle_create_source(

let mut with_properties = handler_args.with_options.into_inner().into_iter().collect();

let mut col_id_gen = ColumnIdGenerator::new_initial();
let mut columns = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_sql_columns(&stmt.columns, &mut col_id_gen)?;
check_and_add_timestamp_column(&with_properties, &mut columns);

check_and_add_timestamp_column(&with_properties, &mut columns, &mut col_id_gen);
let mut col_id_gen = ColumnIdGenerator::new_initial();
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}

let pk_names = bind_pk_names(&stmt.columns, &stmt.constraints)?;

Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,11 @@ fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
/// If a column is marked as `primary key`, its `ColumnId` is also returned.
/// This primary key is not combined with table constraints yet.
pub fn bind_sql_columns(
column_defs: &[ColumnDef],
col_id_gen: &mut ColumnIdGenerator,
) -> Result<Vec<ColumnCatalog>> {
pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result<Vec<ColumnCatalog>> {
let mut columns = Vec::with_capacity(column_defs.len());

for column in column_defs {
ensure_column_options_supported(column)?;
let column_id = col_id_gen.generate(&column.name.real_value());
// Destruct to make sure all fields are properly handled rather than ignored.
// Do NOT use `..` to ignore fields you do not want to deal with.
// Reject them with a clear NotImplemented error.
Expand Down Expand Up @@ -183,7 +179,7 @@ pub fn bind_sql_columns(
columns.push(ColumnCatalog {
column_desc: ColumnDesc {
data_type: bind_data_type(&data_type)?,
column_id,
column_id: ColumnId::placeholder(),
name: name.real_value(),
field_descs,
type_name: "".to_string(),
Expand Down Expand Up @@ -463,7 +459,10 @@ pub(crate) async fn gen_create_table_plan_with_source(
append_only: bool,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
let session = context.session_ctx();
let columns = bind_sql_columns(&column_defs, &mut col_id_gen)?;
let mut columns = bind_sql_columns(&column_defs)?;
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
let mut properties = context.with_options().inner().clone().into_iter().collect();

let pk_names: Vec<String> = bind_pk_names(&column_defs, &constraints)?;
Expand Down Expand Up @@ -531,8 +530,10 @@ pub(crate) fn gen_create_table_plan(
append_only: bool,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
let definition = context.normalized_sql().to_owned();
let columns = bind_sql_columns(&column_defs, &mut col_id_gen)?;

let mut columns = bind_sql_columns(&column_defs)?;
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
let properties = context.with_options().inner().clone().into_iter().collect();
gen_create_table_plan_without_bind(
context,
Expand Down Expand Up @@ -915,8 +916,11 @@ mod tests {
panic!("test case should be create table")
};
let actual: Result<_> = (|| {
let columns =
bind_sql_columns(&column_defs, &mut ColumnIdGenerator::new_initial())?;
let mut columns = bind_sql_columns(&column_defs)?;
let mut col_id_gen = ColumnIdGenerator::new_initial();
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
let pk_names = bind_pk_names(&column_defs, &constraints)?;
let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?;
Ok(pk_column_ids)
Expand Down

0 comments on commit 16bde5c

Please sign in to comment.