Skip to content

Commit

Permalink
feat(iceberg): support dql for nimtable (#18408)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 20, 2024
1 parent 62cfdc8 commit 54f8b68
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
1 change: 0 additions & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ impl Binder {
matches!(self.bind_for, BindFor::Stream)
}

#[expect(dead_code)]
fn is_for_batch(&self) -> bool {
matches!(self.bind_for, BindFor::Batch)
}
Expand Down
65 changes: 58 additions & 7 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::sync::Arc;
use either::Either;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field};
use risingwave_common::catalog::{
debug_assert_column_ids_distinct, is_system_schema, Engine, Field,
};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
Expand Down Expand Up @@ -129,7 +131,31 @@ impl Binder {
.catalog
.get_created_table_by_name(&self.db_name, schema_path, table_name)
{
self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
match table_catalog.engine() {
Engine::Iceberg => {
if self.is_for_batch()
&& let Ok((source_catalog, _)) =
self.catalog.get_source_by_name(
&self.db_name,
schema_path,
&table_catalog.iceberg_source_name().unwrap(),
)
{
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else {
self.resolve_table_relation(
table_catalog.clone(),
schema_name,
as_of,
)?
}
}
Engine::Hummock => self.resolve_table_relation(
table_catalog.clone(),
schema_name,
as_of,
)?,
}
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -177,11 +203,36 @@ impl Binder {
} else if let Some(table_catalog) =
schema.get_created_table_by_name(table_name)
{
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
match table_catalog.engine {
Engine::Iceberg => {
if self.is_for_batch()
&& let Some(source_catalog) = schema
.get_source_by_name(
&table_catalog
.iceberg_source_name()
.unwrap(),
)
{
return Ok(self.resolve_source_relation(
&source_catalog.clone(),
as_of,
));
} else {
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
}
}
Engine::Hummock => {
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
}
}
} else if let Some(source_catalog) =
schema.get_source_by_name(table_name)
{
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,20 @@ impl TableCatalog {
self.engine
}

pub fn iceberg_source_name(&self) -> Option<String> {
match self.engine {
Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
Engine::Hummock => None,
}
}

pub fn iceberg_sink_name(&self) -> Option<String> {
match self.engine {
Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
Engine::Hummock => None,
}
}

pub fn is_table(&self) -> bool {
self.table_type == TableType::Table
}
Expand Down

0 comments on commit 54f8b68

Please sign in to comment.