diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index a105b7e3ece5b..7ffdf94e3c30a 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -90,6 +90,7 @@ message FileScanNode { STORAGE_TYPE_UNSPECIFIED = 0; S3 = 1; GCS = 2; + AZBLOB = 3; } repeated plan_common.ColumnDesc columns = 1; @@ -114,6 +115,21 @@ message GcsFileScanNode { repeated string file_location = 4; } +message AzblobFileScanNode { + enum FileFormat { + FILE_FORMAT_UNSPECIFIED = 0; + PARQUET = 1; + } + + repeated plan_common.ColumnDesc columns = 1; + FileFormat file_format = 2; + string account_name = 3; + string account_key = 4; + string endpoint = 5; + + repeated string file_location = 6; +} + // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. message PostgresQueryNode { repeated plan_common.ColumnDesc columns = 1; @@ -421,6 +437,7 @@ message PlanNode { PostgresQueryNode postgres_query = 40; MySqlQueryNode mysql_query = 41; GcsFileScanNode gcs_file_scan = 42; + AzblobFileScanNode azblob_file_scan = 43; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/executors/src/executor.rs b/src/batch/executors/src/executor.rs index c953faaa47fe7..a2b67f06924ae 100644 --- a/src/batch/executors/src/executor.rs +++ b/src/batch/executors/src/executor.rs @@ -15,6 +15,7 @@ pub use risingwave_batch::executor::*; pub mod aggregation; +mod azblob_file_scan; mod delete; mod expand; mod filter; @@ -50,6 +51,7 @@ mod update; mod utils; mod values; +use azblob_file_scan::AzblobFileScanExecutorBuilder; pub use delete::*; pub use expand::*; pub use filter::*; @@ -115,6 +117,7 @@ register_executor!(SortOverWindow, SortOverWindowExecutor); register_executor!(MaxOneRow, MaxOneRowExecutor); register_executor!(FileScan, FileScanExecutorBuilder); register_executor!(GcsFileScan, GcsFileScanExecutorBuilder); +register_executor!(AzblobFileScan, AzblobFileScanExecutorBuilder); register_executor!(IcebergScan, IcebergScanExecutorBuilder); register_executor!(PostgresQuery, PostgresQueryExecutorBuilder); register_executor!(MysqlQuery, MySqlQueryExecutorBuilder); diff --git a/src/batch/executors/src/executor/azblob_file_scan.rs b/src/batch/executors/src/executor/azblob_file_scan.rs new file mode 100644 index 0000000000000..aa6561f61f233 --- /dev/null +++ b/src/batch/executors/src/executor/azblob_file_scan.rs @@ -0,0 +1,132 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::DataChunk; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_connector::source::iceberg::{ + extract_bucket_and_file_name, new_azblob_operator, read_parquet_file, FileScanBackend, +}; +use risingwave_pb::batch_plan::file_scan_node; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; + +#[derive(PartialEq, Debug)] +pub enum FileFormat { + Parquet, +} + +/// Azblob file scan executor. Currently only support parquet file format. +pub struct AzblobFileScanExecutor { + file_format: FileFormat, + file_location: Vec, + account_name: String, + account_key: String, + endpoint: String, + batch_size: usize, + schema: Schema, + identity: String, +} + +impl Executor for AzblobFileScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl AzblobFileScanExecutor { + pub fn new( + file_format: FileFormat, + file_location: Vec, + account_name: String, + account_key: String, + endpoint: String, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + file_format, + file_location, + account_name, + account_key, + endpoint, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + assert_eq!(self.file_format, FileFormat::Parquet); + for file in self.file_location { + let (bucket, file_name) = + extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?; + let op = new_azblob_operator( + self.account_name.clone(), + self.account_key.clone(), + self.endpoint.clone(), + bucket.clone(), + )?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; + #[for_await] + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; + } + } + } +} + +pub struct AzblobFileScanExecutorBuilder {} + +impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, + _inputs: Vec, + ) -> crate::error::Result { + let file_scan_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::AzblobFileScan + )?; + + Ok(Box::new(AzblobFileScanExecutor::new( + match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { + file_scan_node::FileFormat::Parquet => FileFormat::Parquet, + file_scan_node::FileFormat::Unspecified => unreachable!(), + }, + file_scan_node.file_location.clone(), + file_scan_node.account_name.clone(), + file_scan_node.account_key.clone(), + file_scan_node.endpoint.clone(), + source.context().get_config().developer.chunk_size, + Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), + source.plan_node().get_identity().clone(), + ))) + } +} diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 57854687a3e9c..7626852e08745 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -28,7 +28,7 @@ use iceberg::io::{ use iceberg::{Error, ErrorKind}; use itertools::Itertools; use opendal::layers::{LoggingLayer, RetryLayer}; -use opendal::services::{Gcs, S3}; +use opendal::services::{Azblob, Gcs, S3}; use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -138,10 +138,32 @@ pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult ConnectorResult { + // Create azblob builder. + let mut builder = Azblob::default(); + builder = builder + .container(&container_name) + .endpoint(&endpoint) + .account_name(&account_name) + .account_key(&account_key); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) +} + #[derive(Debug, Clone)] pub enum FileScanBackend { S3, Gcs, + Azblob, } pub fn extract_bucket_and_file_name( @@ -161,6 +183,7 @@ pub fn extract_bucket_and_file_name( let prefix = match file_scan_backend { FileScanBackend::S3 => format!("s3://{}/", bucket), FileScanBackend::Gcs => format!("gcs://{}/", bucket), + FileScanBackend::Azblob => format!("azblob://{}/", bucket), }; let file_name = location[prefix.len()..].to_string(); Ok((bucket, file_name)) @@ -175,6 +198,7 @@ pub async fn list_data_directory( let prefix = match file_scan_backend { FileScanBackend::S3 => format!("s3://{}/", bucket), FileScanBackend::Gcs => format!("gcs://{}/", bucket), + FileScanBackend::Azblob => format!("azblob://{}/", bucket), }; if dir.starts_with(&prefix) { op.list(&file_name) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 03ac2c50cc0d9..d9577c1270f2e 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,8 +21,8 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_gcs_operator, - new_s3_operator, FileScanBackend, + extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator, + new_gcs_operator, new_s3_operator, FileScanBackend, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -80,12 +80,10 @@ impl TableFunction { let return_type = { // arguments: // file format e.g. parquet - // storage type e.g. s3, gcs - // For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory) - // For gcs: file_scan(file_format, gcs, credential, file_location_or_directory) - if args.len() != 6 && args.len() != 4 { - return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into()); - } + // storage type e.g. s3, gcs, azblob + // For s3: file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location_or_directory) + // For gcs: file_scan('parquet', 'gcs', credential, file_location_or_directory) + // For azblob: file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location) let mut eval_args: Vec = vec![]; for arg in &args { if arg.return_type() != DataType::Varchar { @@ -125,6 +123,22 @@ impl TableFunction { } } } + + if (eval_args.len() != 4 && eval_args.len() != 6) + || (eval_args.len() == 4 && !"gcs".eq_ignore_ascii_case(&eval_args[1])) + || (eval_args.len() == 6 + && !"s3".eq_ignore_ascii_case(&eval_args[1]) + && !"azblob".eq_ignore_ascii_case(&eval_args[1])) + { + return Err(BindError( + "file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows: \n + file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) \n + file_scan('parquet', 'gcs', credential, service_account, file_location) \n + file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location)" + .to_owned(), + ) + .into()); + } if !"parquet".eq_ignore_ascii_case(&eval_args[0]) { return Err(BindError( "file_scan function only accepts 'parquet' as file format".to_owned(), @@ -134,9 +148,11 @@ impl TableFunction { if !"s3".eq_ignore_ascii_case(&eval_args[1]) && !"gcs".eq_ignore_ascii_case(&eval_args[1]) + && !"azblob".eq_ignore_ascii_case(&eval_args[1]) { return Err(BindError( - "file_scan function only accepts 's3' or 'gcs' as storage type".to_owned(), + "file_scan function only accepts 's3', 'gcs' or 'azblob' as storage type" + .to_owned(), ) .into()); } @@ -154,6 +170,8 @@ impl TableFunction { (FileScanBackend::S3, eval_args[5].clone()) } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { (FileScanBackend::Gcs, eval_args[3].clone()) + } else if "azblob".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::Azblob, eval_args[5].clone()) } else { unreachable!(); }; @@ -185,6 +203,17 @@ impl TableFunction { new_gcs_operator(eval_args[2].clone(), bucket.clone())? } + FileScanBackend::Azblob => { + let (bucket, _) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + + new_azblob_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } }; let files = if input_file_location.ends_with('/') { let files = tokio::task::block_in_place(|| { @@ -240,6 +269,7 @@ impl TableFunction { match file_scan_backend { FileScanBackend::S3 => args.remove(5), FileScanBackend::Gcs => args.remove(3), + FileScanBackend::Azblob => args.remove(5), }; for file in files { args.push(ExprImpl::Literal(Box::new(Literal::new( diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 55a39fbca8be6..efa17ea9b83b3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -15,7 +15,7 @@ use pretty_xmlish::XmlNode; use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::{FileScanNode, GcsFileScanNode}; +use risingwave_pb::batch_plan::{AzblobFileScanNode, FileScanNode, GcsFileScanNode}; use super::batch::prelude::*; use super::utils::{childless_record, column_names_pretty, Distill}; @@ -109,6 +109,23 @@ impl ToBatchPb for BatchFileScan { file_location: gcs_file_scan.file_location.clone(), }) } + + generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => { + NodeBody::AzblobFileScan(AzblobFileScanNode { + columns: azblob_file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match azblob_file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + account_name: azblob_file_scan.account_name.clone(), + account_key: azblob_file_scan.account_key.clone(), + endpoint: azblob_file_scan.endpoint.clone(), + file_location: azblob_file_scan.file_location.clone(), + }) + } } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 03bf31ce14336..37225b273e27d 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -28,13 +28,16 @@ pub enum FileFormat { pub enum StorageType { S3, Gcs, + Azblob, } +#[allow(clippy::enum_variant_names)] #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub enum FileScanBackend { FileScan(FileScan), GcsFileScan(GcsFileScan), + AzblobFileScan(AzblobFileScan), } #[derive(Debug, Clone, Educe)] @@ -69,6 +72,40 @@ impl GenericPlanNode for GcsFileScan { } } +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct AzblobFileScan { + pub schema: Schema, + pub file_format: FileFormat, + pub storage_type: StorageType, + pub account_name: String, + pub account_key: String, + pub endpoint: String, + pub file_location: Vec, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for AzblobFileScan { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + impl FileScan { pub fn columns(&self) -> Vec { self.schema @@ -81,7 +118,6 @@ impl FileScan { .collect() } } - #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct FileScan { @@ -130,11 +166,26 @@ impl GcsFileScan { } } +impl AzblobFileScan { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } +} + impl GenericPlanNode for FileScanBackend { fn schema(&self) -> Schema { match self { FileScanBackend::FileScan(file_scan) => file_scan.schema(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(), + + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.schema(), } } @@ -142,6 +193,8 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.stream_key(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(), + + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.stream_key(), } } @@ -149,6 +202,8 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.ctx(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(), + + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.ctx(), } } @@ -156,6 +211,10 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(), + + FileScanBackend::AzblobFileScan(azblob_file_scan) => { + azblob_file_scan.functional_dependency() + } } } } @@ -165,6 +224,10 @@ impl FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(), + + FileScanBackend::AzblobFileScan(azblob_file_scan) => { + azblob_file_scan.file_location.clone() + } } } } diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index 847d5f0f7e47b..80063976fdc60 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -94,6 +94,35 @@ impl LogicalFileScan { let base = PlanBase::new_logical_with_core(&core); LogicalFileScan { base, core } } + + pub fn new_azblob_logical_file_scan( + ctx: OptimizerContextRef, + schema: Schema, + file_format: String, + storage_type: String, + account_name: String, + account_key: String, + endpoint: String, + file_location: Vec, + ) -> Self { + assert!("parquet".eq_ignore_ascii_case(&file_format)); + assert!("azblob".eq_ignore_ascii_case(&storage_type)); + + let core = generic::FileScanBackend::AzblobFileScan(generic::AzblobFileScan { + schema, + file_format: generic::FileFormat::Parquet, + storage_type: generic::StorageType::Azblob, + account_name, + account_key, + endpoint, + file_location, + ctx, + }); + + let base = PlanBase::new_logical_with_core(&core); + + LogicalFileScan { base, core } + } } impl_plan_tree_node_for_leaf! {LogicalFileScan} diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index c722bb7c78344..d43ef06f0be76 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -61,6 +61,7 @@ impl Rule for TableFunctionToFileScanRule { assert!( ("s3".eq_ignore_ascii_case(&eval_args[1])) || "gcs".eq_ignore_ascii_case(&eval_args[1]) + || "azblob".eq_ignore_ascii_case(&eval_args[1]) ); if "s3".eq_ignore_ascii_case(&eval_args[1]) { @@ -106,6 +107,25 @@ impl Rule for TableFunctionToFileScanRule { ) .into(), ) + } else if "azblob".eq_ignore_ascii_case(&eval_args[1]) { + let account_name = eval_args[2].clone(); + let account_key = eval_args[3].clone(); + let endpoint = eval_args[4].clone(); + // The rest of the arguments are file locations + let file_location = eval_args[5..].iter().cloned().collect_vec(); + Some( + LogicalFileScan::new_azblob_logical_file_scan( + logical_table_function.ctx(), + schema, + "parquet".to_owned(), + "azblob".to_owned(), + account_name, + account_key, + endpoint, + file_location, + ) + .into(), + ) } else { unreachable!() }