Skip to content

Commit

Permalink
refactor(binder): refine resolve source schema
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Jun 6, 2023
1 parent 1fc4eb9 commit d7bfef8
Showing 1 changed file with 204 additions and 4 deletions.
208 changes: 204 additions & 4 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ use risingwave_connector::source::{
use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::RowFormatType;
use risingwave_sqlparser::ast::{
AvroSchema, CreateSourceStatement, DebeziumAvroSchema, ProtobufSchema, SourceSchema,
SourceWatermark,
AvroSchema, ColumnDef, CreateSourceStatement, DebeziumAvroSchema, ProtobufSchema, SourceSchema,
SourceWatermark, TableConstraint,
};

use super::create_table::bind_sql_table_column_constraints;
use super::create_table::{bind_pk_names, bind_sql_table_column_constraints};
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::ColumnId;
Expand Down Expand Up @@ -126,6 +126,7 @@ async fn extract_upsert_avro_table_schema(
pks,
))
}

async fn extract_debezium_avro_table_pk_columns(
schema: &DebeziumAvroSchema,
with_properties: &HashMap<String, String>,
Expand Down Expand Up @@ -178,6 +179,201 @@ async fn extract_protobuf_table_schema(
.collect_vec())
}

/// resolve the schema of the source from external schema file, return the relation's columns. see https://www.risingwave.dev/docs/current/sql-create-source for more information.
pub(crate) async fn bind_source_schema(
source_schema: &SourceSchema,
columns_defs: Vec<ColumnDef>,
table_constraints: &[TableConstraint],
with_properties: &HashMap<String, String>,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
let pk_column_names = bind_pk_names(&columns_defs, &table_constraints)?;
let user_defined_pk: bool = !pk_column_names.is_empty();
let user_defined_schema: bool = !columns_defs.is_empty();
let is_kafka = is_kafka_connector(with_properties);

Ok(match source_schema {
SourceSchema::Protobuf(protobuf_schema) => {
if user_defined_schema {
return Err(RwError::from(ProtocolError(
"User-defined schema is not allowed with row format protobuf. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#protobuf for more information.".to_string())));
};
(
Some(
extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?,
),
StreamSourceInfo {
row_format: RowFormatType::Protobuf as i32,
row_schema_location: protobuf_schema.row_schema_location.0.clone(),
use_schema_registry: protobuf_schema.use_schema_registry,
proto_message_name: protobuf_schema.message_name.0.clone(),
..Default::default()
},
)
}
SourceSchema::Avro(avro_schema) => {
if user_defined_schema {
return Err(RwError::from(ProtocolError(
"User-defined schema is not allowed with row format avro. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#avro for more information.".to_string())));
}
(
Some(extract_avro_table_schema(avro_schema, with_properties).await?),
StreamSourceInfo {
row_format: RowFormatType::Avro as i32,

row_schema_location: avro_schema.row_schema_location.0.clone(),
use_schema_registry: avro_schema.use_schema_registry,
proto_message_name: "".to_owned(),
..Default::default()
},
)
}
SourceSchema::UpsertAvro(avro_schema) => {
if user_defined_schema {
return Err(RwError::from(ProtocolError(
"User-defined schema is not allowed with row format upsert avro. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#avro for more information.".to_string())));
}
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format upsert avro."
.to_string(),
)));
}
if pk_column_names.len() != 1 {
return Err(RwError::from(ProtocolError(
"upsert avro supports only one primary key column.".to_string(),
)));
}
let upsert_avro_primary_key = pk_column_names[0].clone();
(
Some(extract_avro_table_schema(avro_schema, with_properties).await?),
StreamSourceInfo {
row_format: RowFormatType::UpsertAvro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
use_schema_registry: avro_schema.use_schema_registry,
upsert_avro_primary_key,
..Default::default()
},
)
}
SourceSchema::DebeziumAvro(avro_schema) => {
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format upsert avro."
.to_string(),
)));
}
if pk_column_names.len() != 1 {
return Err(RwError::from(ProtocolError(
"upsert avro supports only one primary key column.".to_string(),
)));
}
let upsert_avro_primary_key = pk_column_names[0].clone();

(
todo!(),
StreamSourceInfo {
row_format: RowFormatType::DebeziumAvro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
..Default::default()
},
)
}

SourceSchema::DebeziumJson => {
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format debezium."
.to_string(),
)));
}
(
None,
StreamSourceInfo {
row_format: RowFormatType::DebeziumJson as i32,
..Default::default()
},
)
}
SourceSchema::UpsertJson => {
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format upsert_json."
.to_string(),
)));
}
(
None,
StreamSourceInfo {
row_format: RowFormatType::UpsertJson as i32,
..Default::default()
},
)
}
SourceSchema::Maxwell => {
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format maxwell."
.to_string(),
)));
}
(
None,
StreamSourceInfo {
row_format: RowFormatType::Maxwell as i32,
..Default::default()
},
)
}
SourceSchema::CanalJson => {
if !user_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format cannal_json."
.to_string(),
)));
}
(
None,
StreamSourceInfo {
row_format: RowFormatType::Maxwell as i32,
..Default::default()
},
)
}
SourceSchema::Csv(csv_info) => {
if is_kafka && csv_info.has_header {
return Err(RwError::from(ProtocolError(
"CSV HEADER is not supported when creating table with Kafka connector"
.to_owned(),
)));
}
(
None,
StreamSourceInfo {
row_format: RowFormatType::Csv as i32,
csv_delimiter: csv_info.delimiter as i32,
csv_has_header: csv_info.has_header,
..Default::default()
},
)
}
SourceSchema::Json => (
None,
StreamSourceInfo {
row_format: RowFormatType::Json as i32,
..Default::default()
},
),
SourceSchema::Native => (
None,
StreamSourceInfo {
row_format: RowFormatType::Native as i32,
..Default::default()
},
),
SourceSchema::DebeziumMongoJson => todo!(),
})
}

pub(crate) async fn resolve_source_schema(
source_schema: SourceSchema,
columns: &mut Vec<ColumnCatalog>,
Expand Down Expand Up @@ -353,7 +549,7 @@ pub(crate) async fn resolve_source_schema(
}
SourceSchema::DebeziumMongoJson => {
if columns.is_empty() {
let mut col_id_gen = ColumnIdGenerator::new_initial();
let mut col_id_gen: ColumnIdGenerator = ColumnIdGenerator::new_initial();
let pk_id = col_id_gen.generate("_id");
columns.push(ColumnCatalog {
column_desc: ColumnDesc {
Expand Down Expand Up @@ -760,6 +956,10 @@ pub async fn handle_create_source(

let mut col_id_gen = ColumnIdGenerator::new_initial();

let columns = if must_resolve_source_schema(&stmt.source_schema) {
} else {
};

let mut column_descs = bind_sql_columns(stmt.columns.clone(), &mut col_id_gen)?;

check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen);
Expand Down

0 comments on commit d7bfef8

Please sign in to comment.