Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support iceberg sink create table #18362

Merged
merged 18 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,65 @@ CREATE SOURCE iceberg_demo_source WITH (
table.name='e2e_demo_table'
);

statement ok
CREATE SINK s7 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3a://hummock001',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_auto_create_table',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
CREATE SOURCE iceberg_e2e_auto_create_table WITH (
connector = 'iceberg',
warehouse.path = 's3a://hummock001',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_auto_create_table'
);

statement ok
INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');

statement ok
FLUSH;

sleep 5s
sleep 20s

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_demo_source';
----
1

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_e2e_auto_create_table';
----
1

query I
select sum(record_count) from rw_iceberg_files where source_name = 'iceberg_demo_source';
----
7

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_demo_source';
select sum(record_count) from rw_iceberg_files where source_name = 'iceberg_e2e_auto_create_table';
----
1
7

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50');
Expand All @@ -78,10 +120,7 @@ statement ok
DROP SOURCE iceberg_demo_source;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP SOURCE iceberg_e2e_auto_create_table;

statement ok
DROP TABLE t6;
DROP TABLE t6 cascade;
88 changes: 86 additions & 2 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_trait::async_trait;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, TableIdent};
use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
use icelake::catalog::{
load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME,
Expand All @@ -43,9 +43,10 @@ use icelake::io_v2::{
DataFileWriterBuilder, EqualityDeltaWriterBuilder, IcebergWriterBuilder, DELETE_OP, INSERT_OP,
};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile, COLUMN_ID_META_KEY};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
Expand Down Expand Up @@ -151,6 +152,9 @@ pub struct IcebergConfig {
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,
}

impl IcebergConfig {
Expand Down Expand Up @@ -701,6 +705,10 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_and_validate_table(&self) -> Result<Table> {
if self.config.create_table_if_not_exists {
self.create_table_if_not_exists().await?;
}

let table = self
.config
.load_table()
Expand All @@ -722,6 +730,79 @@ impl IcebergSink {
Ok(table)
}

async fn create_table_if_not_exists(&self) -> Result<()> {
let catalog = self.config.create_catalog_v2().await?;
let table_id = self
.config
.full_table_name_v2()
.context("Unable to parse table name")?;
if !catalog
.table_exists(&table_id)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?
{
let namespace = if let Some(database_name) = &self.config.database_name {
NamespaceIdent::new(database_name.clone())
} else {
bail!("database name must be set if you want to create table")
};

// convert risingwave schema -> arrow schema -> iceberg schema
let arrow_fields = self
.param
.columns
.iter()
.map(|column| {
let mut arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?;
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
metadata.insert(
COLUMN_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
arrow_field.set_metadata(metadata);
Ok(arrow_field)
})
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to convert arrow schema to iceberg schema")?;

let location = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nips: Can use a separate function for handle the path

let mut names = namespace.clone().inner();
names.push(self.config.table_name.to_string());
if self.config.path.ends_with('/') {
format!("{}{}", self.config.path, names.join("/"))
} else {
format!("{}/{}", self.config.path, names.join("/"))
}
};

let table_creation = TableCreation::builder()
.name(self.config.table_name.clone())
.schema(iceberg_schema)
.location(location)
.build();

catalog
.create_table(&namespace, table_creation)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to create iceberg table")?;
}
Ok(())
}

pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
if let Some(pk) = &config.primary_key {
Expand Down Expand Up @@ -1292,6 +1373,8 @@ pub fn try_matches_arrow_schema(

let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
(ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
(ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
(ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
(left, right) => left == right,
};
if !compatible {
Expand Down Expand Up @@ -1394,6 +1477,7 @@ mod test {
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
create_table_if_not_exists: false,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
65 changes: 58 additions & 7 deletions src/connector/src/sink/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::HashMap;

use async_trait::async_trait;
use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
Expand Down Expand Up @@ -218,18 +218,54 @@ impl Catalog for StorageCatalog {
/// Create a new table inside the namespace.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> iceberg::Result<Table> {
todo!()
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
let table_path = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can ignore this comments, I'm going to propose another PR to refactor it.

let mut names = table_ident.namespace.clone().inner();
names.push(table_ident.name.to_string());
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};

// Create the metadata directory
let metadata_path = format!("{table_path}/metadata");

// Create the initial table metadata
let table_metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;

// Write the initial metadata file
let metadata_file_path = format!("{metadata_path}/v1.metadata.json");
let metadata_json = serde_json::to_string(&table_metadata)?;
let output = self.file_io.new_output(&metadata_file_path)?;
output.write(metadata_json.into()).await?;

// Write the version hint file
let version_hint_path = format!("{table_path}/metadata/version-hint.text");
let version_hint_output = self.file_io.new_output(&version_hint_path)?;
version_hint_output.write("1".into()).await?;

Ok(Table::builder()
.metadata(table_metadata)
.identifier(table_ident)
.file_io(self.file_io.clone())
.build())
}

/// Load table from the catalog.
async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
let table_path = {
let mut names = table.namespace.clone().inner();
names.push(table.name.to_string());
format!("{}/{}", self.warehouse, names.join("/"))
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};
let path = if self.is_version_hint_exist(&table_path).await? {
let version_hint = self.read_version_hint(&table_path).await?;
Expand Down Expand Up @@ -262,8 +298,23 @@ impl Catalog for StorageCatalog {
}

/// Check if a table exists in the catalog.
async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result<bool> {
todo!()
async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
let table_path = {
let mut names = table.namespace.clone().inner();
names.push(table.name.to_string());
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};
let metadata_path = format!("{table_path}/metadata/version-hint.text");
self.file_io.is_exist(&metadata_path).await.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to check if table exists: {}", err.as_report()),
)
})
}

/// Rename a table in the catalog.
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ IcebergConfig:
comments: Commit every n(>0) checkpoints, default is 10.
required: false
default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL
- name: create_table_if_not_exists
field_type: bool
required: false
default: Default::default
KafkaConfig:
fields:
- name: properties.bootstrap.server
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ pub async fn get_partition_compute_info(
async fn get_partition_compute_info_for_iceberg(
iceberg_config: &IcebergConfig,
) -> Result<Option<PartitionComputeInfo>> {
// TODO: check table if exists
if iceberg_config.create_table_if_not_exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Ok(None);
}
let table = iceberg_config.load_table().await?;
let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else {
return Ok(None);
Expand Down
Loading