Skip to content

Commit

Permalink
feat(iceberg): support create table without pk for nimtable (#18406)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 4, 2024
1 parent d91c2e4 commit f05a3c5
Showing 1 changed file with 47 additions and 28 deletions.
75 changes: 47 additions & 28 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
CdcTableInfo, ColumnDef, ColumnOption, CompatibleSourceSchema, ConnectorSchema, CreateSink,
CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
Ident, ObjectName, OnConflict, SourceWatermark, TableConstraint, WithProperties,
Ident, ObjectName, OnConflict, SourceWatermark, Statement, TableConstraint, WithProperties,
};
use risingwave_sqlparser::parser::IncludeOption;
use risingwave_sqlparser::parser::{IncludeOption, Parser};
use thiserror_ext::AsReport;

use super::{create_sink, create_source, RwPgResponse};
Expand Down Expand Up @@ -1312,20 +1312,6 @@ pub async fn handle_create_table(
.await?;
}
Engine::Iceberg => {
let with_properties = WithProperties(vec![]);
let create_sink_stmt = CreateSinkStatement {
if_not_exists: false,
sink_name: ObjectName::from(vec![Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(),
)]),
with_properties,
sink_from: CreateSink::From(table_name.clone()),
columns: vec![],
emit_mode: None,
sink_schema: None,
into_table_name: None,
};

// export AWS_REGION=your_region
// export AWS_ACCESS_KEY_ID=your_access_key
// export AWS_SECRET_ACCESS_KEY=your_secret_key
Expand Down Expand Up @@ -1363,23 +1349,21 @@ pub async fn handle_create_table(
} else {
"jdbc:sqlite:/tmp/sqlite/iceberg.db".to_string()
};

let meta_store_user = if let Ok(user) = std::env::var("META_STORE_USER") {
user
} else {
"xxx".to_string()
};

let meta_store_password = if let Ok(password) = std::env::var("META_STORE_PASSWORD") {
password
} else {
"xxx".to_string()
};
let catalog_name = "nimtable".to_string();
let database_name = "nimtable_db".to_string();

let mut sink_handler_args = handler_args.clone();
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());
// TODO: don't hard code primary key
// Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
// Fetch primary key from columns
let mut pks = column_defs
.into_iter()
.filter(|c| {
Expand All @@ -1389,6 +1373,8 @@ pub async fn handle_create_table(
})
.map(|c| c.name.to_string())
.collect::<Vec<String>>();

// Fetch primary key from constraints
if pks.is_empty() {
pks = constraints
.into_iter()
Expand All @@ -1410,12 +1396,45 @@ pub async fn handle_create_table(
})
.collect::<Vec<String>>();
}
if pks.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"Primary key is required for iceberg table".to_string(),
)
.into());
}

// There is a table without primary key. We will use _row_id as primary key
let sink_from = if pks.is_empty() {
pks = vec!["_row_id".to_string()];
let [stmt]: [_; 1] =
Parser::parse_sql(&format!("select _row_id, * from {}", table_name))
.context("unable to parse query")?
.try_into()
.unwrap();

let Statement::Query(query) = &stmt else {
panic!("unexpected statement: {:?}", stmt);
};
CreateSink::AsQuery(query.clone())
} else {
CreateSink::From(table_name.clone())
};

let with_properties = WithProperties(vec![]);
let create_sink_stmt = CreateSinkStatement {
if_not_exists: false,
sink_name: ObjectName::from(vec![Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(),
)]),
with_properties,
sink_from,
columns: vec![],
emit_mode: None,
sink_schema: None,
into_table_name: None,
};

let catalog_name = "nimtable".to_string();
let database_name = "nimtable_db".to_string();

let mut sink_handler_args = handler_args.clone();
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());

with.insert("primary_key".to_string(), pks.join(","));
with.insert("type".to_string(), "upsert".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
Expand Down

0 comments on commit f05a3c5

Please sign in to comment.