From 5658fc76e50b65be9b0a99b83245e9616f77c819 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 31 Dec 2024 17:38:06 +0800 Subject: [PATCH 01/12] add gcs file scan --- proto/batch_plan.proto | 15 ++ src/batch/executors/src/executor.rs | 3 + .../executors/src/executor/gcs_file_scan.rs | 127 +++++++++++++++++ .../executors/src/executor/s3_file_scan.rs | 4 +- .../source/iceberg/parquet_file_handler.rs | 81 +++++++---- src/frontend/src/expr/table_function.rs | 129 +++++++++++++----- .../optimizer/plan_node/batch_file_scan.rs | 57 +++++--- .../optimizer/plan_node/generic/file_scan.rs | 96 ++++++++++++- .../optimizer/plan_node/logical_file_scan.rs | 35 ++++- .../rule/table_function_to_file_scan_rule.rs | 62 ++++++--- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- 11 files changed, 507 insertions(+), 104 deletions(-) create mode 100644 src/batch/executors/src/executor/gcs_file_scan.rs diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2373b7d483e30..8869734a4fd95 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -88,6 +88,7 @@ message FileScanNode { enum StorageType { STORAGE_TYPE_UNSPECIFIED = 0; S3 = 1; + GCS = 2; } repeated plan_common.ColumnDesc columns = 1; @@ -99,6 +100,19 @@ message FileScanNode { repeated string file_location = 7; } +message GcsFileScanNode { + enum FileFormat { + FILE_FORMAT_UNSPECIFIED = 0; + PARQUET = 1; + } + + repeated plan_common.ColumnDesc columns = 1; + FileFormat file_format = 2; + string credential = 3; + string service_account = 4; + repeated string file_location = 5; +} + // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. message PostgresQueryNode { repeated plan_common.ColumnDesc columns = 1; @@ -405,6 +419,7 @@ message PlanNode { IcebergScanNode iceberg_scan = 39; PostgresQueryNode postgres_query = 40; MySqlQueryNode mysql_query = 41; + GcsFileScanNode gcs_file_scan = 42; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/executors/src/executor.rs b/src/batch/executors/src/executor.rs index e6835a009b035..dc5dbff8e2c80 100644 --- a/src/batch/executors/src/executor.rs +++ b/src/batch/executors/src/executor.rs @@ -18,6 +18,7 @@ pub mod aggregation; mod delete; mod expand; mod filter; +mod gcs_file_scan; mod generic_exchange; mod group_top_n; mod hash_agg; @@ -52,6 +53,7 @@ mod values; pub use delete::*; pub use expand::*; pub use filter::*; +use gcs_file_scan::GcsFileScanExecutorBuilder; pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; @@ -112,6 +114,7 @@ register_executor!(Source, SourceExecutor); register_executor!(SortOverWindow, SortOverWindowExecutor); register_executor!(MaxOneRow, MaxOneRowExecutor); register_executor!(FileScan, FileScanExecutorBuilder); +register_executor!(GcsFileScan, GcsFileScanExecutorBuilder); register_executor!(IcebergScan, IcebergScanExecutorBuilder); register_executor!(PostgresQuery, PostgresQueryExecutorBuilder); register_executor!(MysqlQuery, MySqlQueryExecutorBuilder); diff --git a/src/batch/executors/src/executor/gcs_file_scan.rs b/src/batch/executors/src/executor/gcs_file_scan.rs new file mode 100644 index 0000000000000..ac4baa8551b23 --- /dev/null +++ b/src/batch/executors/src/executor/gcs_file_scan.rs @@ -0,0 +1,127 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::DataChunk; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_connector::source::iceberg::{ + extract_gcs_bucket_and_file_name, extract_s3_bucket_and_file_name, new_gcs_operator, + read_parquet_file, +}; +use risingwave_pb::batch_plan::file_scan_node; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; + +#[derive(PartialEq, Debug)] +pub enum FileFormat { + Parquet, +} + +/// Gcs file scan executor. Currently only support parquet file format. +pub struct GcsFileScanExecutor { + file_format: FileFormat, + file_location: Vec, + gcs_credential: String, + service_account: String, + batch_size: usize, + schema: Schema, + identity: String, +} + +impl Executor for GcsFileScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl GcsFileScanExecutor { + pub fn new( + file_format: FileFormat, + file_location: Vec, + gcs_credential: String, + service_account: String, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + file_format, + file_location, + gcs_credential, + service_account, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + assert_eq!(self.file_format, FileFormat::Parquet); + for file in self.file_location { + let (bucket, file_name) = extract_gcs_bucket_and_file_name(&file)?; + let op = new_gcs_operator( + self.gcs_credential.clone(), + self.service_account.clone(), + bucket.clone(), + )?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; + #[for_await] + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; + } + } + } +} + +pub struct GcsFileScanExecutorBuilder {} + +impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, + _inputs: Vec, + ) -> crate::error::Result { + let file_scan_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::GcsFileScan + )?; + + Ok(Box::new(GcsFileScanExecutor::new( + match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { + file_scan_node::FileFormat::Parquet => FileFormat::Parquet, + file_scan_node::FileFormat::Unspecified => unreachable!(), + }, + file_scan_node.file_location.clone(), + file_scan_node.credential.clone(), + file_scan_node.service_account.clone(), + source.context().get_config().developer.chunk_size, + Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), + source.plan_node().get_identity().clone(), + ))) + } +} diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 8140011dfcfce..f051eed017bad 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, new_s3_operator, read_parquet_file, + extract_s3_bucket_and_file_name, new_s3_operator, read_parquet_file, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; @@ -84,7 +84,7 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let (bucket, file_name) = extract_bucket_and_file_name(&file)?; + let (bucket, file_name) = extract_s3_bucket_and_file_name(&file)?; let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 49b6d9a425276..c966f117c5fb3 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -28,7 +28,7 @@ use iceberg::io::{ use iceberg::{Error, ErrorKind}; use itertools::Itertools; use opendal::layers::{LoggingLayer, RetryLayer}; -use opendal::services::S3; +use opendal::services::{Gcs, S3}; use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -127,7 +127,25 @@ pub fn new_s3_operator( Ok(op) } -pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { +pub fn new_gcs_operator( + credential: String, + service_account: String, + bucket: String, +) -> ConnectorResult { + // Create gcs builder. + let builder = Gcs::default() + .bucket(&bucket) + .credential(&credential) + .service_account(&service_account); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) +} + +pub fn extract_s3_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { let url = Url::parse(location)?; let bucket = url .host_str() @@ -143,29 +161,46 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, Ok((bucket, file_name)) } -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { - let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; - let prefix = format!("s3://{}/", bucket); +pub fn extract_gcs_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { + let url = Url::parse(location)?; + let bucket = url + .host_str() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {}, missing bucket", location), + ) + })? + .to_owned(); + let prefix = format!("gcs://{}/", bucket); + let file_name = location[prefix.len()..].to_string(); + Ok((bucket, file_name)) +} + +pub async fn list_gcs_directory(op: Operator, dir: String) -> Result, anyhow::Error> { + let (bucket, file_name) = extract_gcs_bucket_and_file_name(&dir)?; + let prefix = format!("gcs://{}/", bucket); if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(&bucket); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); + op.list(&file_name) + .await + .map_err(|e| anyhow!(e)) + .map(|list| { + list.into_iter() + .map(|entry| prefix.clone() + entry.path()) + .collect() + }) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {}, should start with {}", dir, prefix), + ))? + } +} +pub async fn list_s3_directory(op: Operator, dir: String) -> Result, anyhow::Error> { + let (bucket, file_name) = extract_s3_bucket_and_file_name(&dir)?; + let prefix = format!("s3://{}/", bucket); + if dir.starts_with(&prefix) { op.list(&file_name) .await .map_err(|e| anyhow!(e)) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index d49b4332b117f..b59058fbe8fe8 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,8 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, + extract_gcs_bucket_and_file_name, extract_s3_bucket_and_file_name, get_parquet_fields, + list_gcs_directory, list_s3_directory, new_gcs_operator, new_s3_operator, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -84,7 +85,7 @@ impl TableFunction { // s3 access key // s3 secret key // file location - if args.len() != 6 { + if args.len() != 6 && args.len() != 5 { return Err(BindError("file_scan function only accepts 6 arguments: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location)".to_owned()).into()); } let mut eval_args: Vec = vec![]; @@ -133,9 +134,11 @@ impl TableFunction { .into()); } - if !"s3".eq_ignore_ascii_case(&eval_args[1]) { + if !"s3".eq_ignore_ascii_case(&eval_args[1]) + && !"gcs".eq_ignore_ascii_case(&eval_args[1]) + { return Err(BindError( - "file_scan function only accepts 's3' as storage type".to_owned(), + "file_scan function only accepts 's3' or 'gcs' as storage type".to_owned(), ) .into()); } @@ -148,46 +151,106 @@ impl TableFunction { #[cfg(not(madsim))] { - let files = if eval_args[5].ends_with('/') { - let files = tokio::task::block_in_place(|| { - FRONTEND_RUNTIME.block_on(async { - let files = list_s3_directory( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), + let files = if "s3".eq_ignore_ascii_case(&eval_args[1]) { + let (bucket, _) = extract_s3_bucket_and_file_name(&eval_args[5].clone())?; + let op = new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )?; + let files = if eval_args[5].ends_with('/') { + let files = tokio::task::block_in_place(|| { + FRONTEND_RUNTIME.block_on(async { + let files = + list_s3_directory(op.clone(), eval_args[5].clone()).await?; + + Ok::, anyhow::Error>(files) + }) + })?; + + if files.is_empty() { + return Err(BindError( + "file_scan function only accepts non-empty directory".to_owned(), ) - .await?; - - Ok::, anyhow::Error>(files) - }) - })?; + .into()); + } - if files.is_empty() { - return Err(BindError( - "file_scan function only accepts non-empty directory".to_owned(), - ) - .into()); - } + Some(files) + } else { + None + }; + files + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + let (bucket, _) = extract_gcs_bucket_and_file_name(&eval_args[4].clone())?; + let op = new_gcs_operator( + eval_args[2].clone(), + eval_args[3].clone(), + bucket.clone(), + )?; + + let files = if eval_args[4].ends_with('/') { + let files = tokio::task::block_in_place(|| { + FRONTEND_RUNTIME.block_on(async { + let files = + list_gcs_directory(op.clone(), eval_args[4].clone()).await?; + + Ok::, anyhow::Error>(files) + }) + })?; + + if files.is_empty() { + return Err(BindError( + "file_scan function only accepts non-empty directory".to_owned(), + ) + .into()); + } - Some(files) + Some(files) + } else { + None + }; + files } else { - None + unreachable!() }; let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { let location = match files.as_ref() { Some(files) => files[0].clone(), - None => eval_args[5].clone(), + None => { + if "s3".eq_ignore_ascii_case(&eval_args[1]) { + eval_args[5].clone() + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + eval_args[4].clone() + } else { + unreachable!() + } + } + }; + let (op, file_name) = if "s3".eq_ignore_ascii_case(&eval_args[1]) { + let (bucket, file_name) = extract_s3_bucket_and_file_name(&location)?; + ( + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )?, + file_name, + ) + } else { + let (bucket, file_name) = extract_gcs_bucket_and_file_name(&location)?; + ( + new_gcs_operator( + eval_args[2].clone(), + eval_args[3].clone(), + bucket.clone(), + )?, + file_name, + ) }; - let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?; let fields = get_parquet_fields(op, file_name).await?; diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 649c178855ef9..2705fa584f1c3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -15,7 +15,7 @@ use pretty_xmlish::XmlNode; use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::FileScanNode; +use risingwave_pb::batch_plan::{FileScanNode, GcsFileScanNode}; use super::batch::prelude::*; use super::utils::{childless_record, column_names_pretty, Distill}; @@ -29,11 +29,11 @@ use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchFileScan { pub base: PlanBase, - pub core: generic::FileScan, + pub core: generic::FileScanBackend, } impl BatchFileScan { - pub fn new(core: generic::FileScan) -> Self { + pub fn new(core: generic::FileScanBackend) -> Self { let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any()); Self { base, core } @@ -77,24 +77,39 @@ impl ToDistributedBatch for BatchFileScan { impl ToBatchPb for BatchFileScan { fn to_batch_prost_body(&self) -> NodeBody { - NodeBody::FileScan(FileScanNode { - columns: self - .core - .columns() - .into_iter() - .map(|col| col.to_protobuf()) - .collect(), - file_format: match self.core.file_format { - generic::FileFormat::Parquet => FileFormat::Parquet as i32, - }, - storage_type: match self.core.storage_type { - generic::StorageType::S3 => StorageType::S3 as i32, - }, - s3_region: self.core.s3_region.clone(), - s3_access_key: self.core.s3_access_key.clone(), - s3_secret_key: self.core.s3_secret_key.clone(), - file_location: self.core.file_location.clone(), - }) + match &self.core { + generic::FileScanBackend::FileScan(file_scan) => NodeBody::FileScan(FileScanNode { + columns: file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + storage_type: StorageType::S3 as i32, + + s3_region: file_scan.s3_region.clone(), + s3_access_key: file_scan.s3_access_key.clone(), + s3_secret_key: file_scan.s3_secret_key.clone(), + file_location: file_scan.file_location.clone(), + }), + generic::FileScanBackend::GcsFileScan(gcs_file_scan) => { + NodeBody::GcsFileScan(GcsFileScanNode { + columns: gcs_file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match gcs_file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + credential: gcs_file_scan.creditial.clone(), + service_account: gcs_file_scan.service_account.clone(), + file_location: gcs_file_scan.file_location.clone(), + }) + } + } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 975151d89c797..ce9643ed530ec 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -27,6 +27,61 @@ pub enum FileFormat { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum StorageType { S3, + Gcs, +} + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub enum FileScanBackend { + FileScan(FileScan), + GcsFileScan(GcsFileScan), // Assume GcsFileScan is defined elsewhere +} + +// Example definition of GcsFileScan (you may need to adjust it) +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct GcsFileScan { + pub schema: Schema, + pub file_format: FileFormat, + pub storage_type: StorageType, + pub creditial: String, + pub service_account: String, + pub file_location: Vec, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for GcsFileScan { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + +impl FileScan { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } } #[derive(Debug, Clone, Educe)] @@ -63,7 +118,7 @@ impl GenericPlanNode for FileScan { } } -impl FileScan { +impl GcsFileScan { pub fn columns(&self) -> Vec { self.schema .fields @@ -75,3 +130,42 @@ impl FileScan { .collect() } } + +impl GenericPlanNode for FileScanBackend { + fn schema(&self) -> Schema { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.schema(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(), + } + } + + fn stream_key(&self) -> Option> { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.stream_key(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(), + } + } + + fn ctx(&self) -> OptimizerContextRef { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.ctx(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(), + } + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(), + } + } +} + +impl FileScanBackend { + pub fn file_location(&self) -> Vec { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(), + } + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index abe8e40a8224f..a56241d686adb 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -35,11 +35,11 @@ use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalFileScan { pub base: PlanBase, - pub core: generic::FileScan, + pub core: generic::FileScanBackend, } impl LogicalFileScan { - pub fn new( + pub fn new_s3_logical_file_scan( ctx: OptimizerContextRef, schema: Schema, file_format: String, @@ -52,7 +52,7 @@ impl LogicalFileScan { assert!("parquet".eq_ignore_ascii_case(&file_format)); assert!("s3".eq_ignore_ascii_case(&storage_type)); - let core = generic::FileScan { + let core = generic::FileScanBackend::FileScan(generic::FileScan { schema, file_format: generic::FileFormat::Parquet, storage_type: generic::StorageType::S3, @@ -61,7 +61,34 @@ impl LogicalFileScan { s3_secret_key, file_location, ctx, - }; + }); + + let base = PlanBase::new_logical_with_core(&core); + + LogicalFileScan { base, core } + } + + pub fn new_gcs_logical_file_scan( + ctx: OptimizerContextRef, + schema: Schema, + file_format: String, + storage_type: String, + creditial: String, + service_account: String, + file_location: Vec, + ) -> Self { + assert!("parquet".eq_ignore_ascii_case(&file_format)); + assert!("gcs".eq_ignore_ascii_case(&storage_type)); + + let core = generic::FileScanBackend::GcsFileScan(generic::GcsFileScan { + schema, + file_format: generic::FileFormat::Parquet, + storage_type: generic::StorageType::Gcs, + creditial, + service_account, + file_location, + ctx, + }); let base = PlanBase::new_logical_with_core(&core); diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index ab538fb223bd7..6956d83f035a9 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -43,7 +43,6 @@ impl Rule for TableFunctionToFileScanRule { let schema = Schema::new(fields); - assert!(logical_table_function.table_function().args.len() >= 6); let mut eval_args = vec![]; for arg in &logical_table_function.table_function().args { assert_eq!(arg.return_type(), DataType::Varchar); @@ -58,25 +57,50 @@ impl Rule for TableFunctionToFileScanRule { } } assert!("parquet".eq_ignore_ascii_case(&eval_args[0])); - assert!("s3".eq_ignore_ascii_case(&eval_args[1])); - let s3_region = eval_args[2].clone(); - let s3_access_key = eval_args[3].clone(); - let s3_secret_key = eval_args[4].clone(); - // The rest of the arguments are file locations - let file_location = eval_args[5..].iter().cloned().collect_vec(); - Some( - LogicalFileScan::new( - logical_table_function.ctx(), - schema, - "parquet".to_owned(), - "s3".to_owned(), - s3_region, - s3_access_key, - s3_secret_key, - file_location, + assert!( + ("s3".eq_ignore_ascii_case(&eval_args[1])) + || "gcs".eq_ignore_ascii_case(&eval_args[1]) + ); + + if "s3".eq_ignore_ascii_case(&eval_args[1]) { + let s3_region = eval_args[2].clone(); + let s3_access_key = eval_args[3].clone(); + let s3_secret_key = eval_args[4].clone(); + // The rest of the arguments are file locations + let file_location = eval_args[5..].iter().cloned().collect_vec(); + Some( + LogicalFileScan::new_s3_logical_file_scan( + logical_table_function.ctx(), + schema, + "parquet".to_owned(), + "s3".to_owned(), + s3_region, + s3_access_key, + s3_secret_key, + file_location, + ) + .into(), + ) + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + let creditial = eval_args[2].clone(); + let service_account = eval_args[3].clone(); + // The rest of the arguments are file locations + let file_location = eval_args[4..].iter().cloned().collect_vec(); + Some( + LogicalFileScan::new_gcs_logical_file_scan( + logical_table_function.ctx(), + schema, + "parquet".to_owned(), + "gcs".to_owned(), + creditial, + service_account, + file_location, + ) + .into(), ) - .into(), - ) + } else { + unreachable!() + } } else { unreachable!("TableFunction return type should be struct") } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 974426b6ed952..f6e6c70a3f8e9 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1188,7 +1188,7 @@ impl BatchPlanFragmenter { if let Some(batch_file_scan) = node.as_batch_file_scan() { return Ok(Some(FileScanInfo { - file_location: batch_file_scan.core.file_location.clone(), + file_location: batch_file_scan.core.file_location().clone(), })); } From 727bde636b6c4626687ddb47b01b2e4228edf084 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 31 Dec 2024 18:14:53 +0800 Subject: [PATCH 02/12] code refactor --- .../executors/src/executor/gcs_file_scan.rs | 5 +- .../executors/src/executor/s3_file_scan.rs | 4 +- .../source/iceberg/parquet_file_handler.rs | 65 +++----- src/frontend/src/expr/table_function.rs | 151 +++++++----------- 4 files changed, 84 insertions(+), 141 deletions(-) diff --git a/src/batch/executors/src/executor/gcs_file_scan.rs b/src/batch/executors/src/executor/gcs_file_scan.rs index ac4baa8551b23..df59b4e8241f3 100644 --- a/src/batch/executors/src/executor/gcs_file_scan.rs +++ b/src/batch/executors/src/executor/gcs_file_scan.rs @@ -17,8 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_gcs_bucket_and_file_name, extract_s3_bucket_and_file_name, new_gcs_operator, - read_parquet_file, + extract_bucket_and_file_name, new_gcs_operator, read_parquet_file, FileScanBackend, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -81,7 +80,7 @@ impl GcsFileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let (bucket, file_name) = extract_gcs_bucket_and_file_name(&file)?; + let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?; let op = new_gcs_operator( self.gcs_credential.clone(), self.service_account.clone(), diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index f051eed017bad..157e7f63d6ba5 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_s3_bucket_and_file_name, new_s3_operator, read_parquet_file, + extract_bucket_and_file_name, new_s3_operator, read_parquet_file, FileScanBackend, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; @@ -84,7 +84,7 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let (bucket, file_name) = extract_s3_bucket_and_file_name(&file)?; + let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?; let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index c966f117c5fb3..3c70b3cab3266 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -145,61 +145,44 @@ pub fn new_gcs_operator( Ok(operator) } -pub fn extract_s3_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { - let url = Url::parse(location)?; - let bucket = url - .host_str() - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", location), - ) - })? - .to_owned(); - let prefix = format!("s3://{}/", bucket); - let file_name = location[prefix.len()..].to_string(); - Ok((bucket, file_name)) +#[derive(Debug, Clone)] +pub enum FileScanBackend { + S3, + Gcs, } -pub fn extract_gcs_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { +pub fn extract_bucket_and_file_name( + location: &str, + file_scan_backend: &FileScanBackend, +) -> ConnectorResult<(String, String)> { let url = Url::parse(location)?; let bucket = url .host_str() .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Invalid gcs url: {}, missing bucket", location), + format!("Invalid url: {}, missing bucket", location), ) })? .to_owned(); - let prefix = format!("gcs://{}/", bucket); + let prefix = match file_scan_backend { + FileScanBackend::S3 => format!("s3://{}/", bucket), + FileScanBackend::Gcs => format!("gcs://{}/", bucket), + }; let file_name = location[prefix.len()..].to_string(); Ok((bucket, file_name)) } -pub async fn list_gcs_directory(op: Operator, dir: String) -> Result, anyhow::Error> { - let (bucket, file_name) = extract_gcs_bucket_and_file_name(&dir)?; - let prefix = format!("gcs://{}/", bucket); - if dir.starts_with(&prefix) { - op.list(&file_name) - .await - .map_err(|e| anyhow!(e)) - .map(|list| { - list.into_iter() - .map(|entry| prefix.clone() + entry.path()) - .collect() - }) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid gcs url: {}, should start with {}", dir, prefix), - ))? - } -} - -pub async fn list_s3_directory(op: Operator, dir: String) -> Result, anyhow::Error> { - let (bucket, file_name) = extract_s3_bucket_and_file_name(&dir)?; - let prefix = format!("s3://{}/", bucket); +pub async fn list_data_directory( + op: Operator, + dir: String, + file_scan_backend: &FileScanBackend, +) -> Result, anyhow::Error> { + let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?; + let prefix = match file_scan_backend { + FileScanBackend::S3 => format!("s3://{}/", bucket), + FileScanBackend::Gcs => format!("gcs://{}/", bucket), + }; if dir.starts_with(&prefix) { op.list(&file_name) .await @@ -212,7 +195,7 @@ pub async fn list_s3_directory(op: Operator, dir: String) -> Result, } else { Err(Error::new( ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", dir, prefix), + format!("Invalid url: {}, should start with {}", dir, prefix), ))? } } diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index b59058fbe8fe8..b97d89995cd80 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,8 +21,8 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_gcs_bucket_and_file_name, extract_s3_bucket_and_file_name, get_parquet_fields, - list_gcs_directory, list_s3_directory, new_gcs_operator, new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_gcs_operator, + new_s3_operator, FileScanBackend, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -151,107 +151,68 @@ impl TableFunction { #[cfg(not(madsim))] { - let files = if "s3".eq_ignore_ascii_case(&eval_args[1]) { - let (bucket, _) = extract_s3_bucket_and_file_name(&eval_args[5].clone())?; - let op = new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?; - let files = if eval_args[5].ends_with('/') { - let files = tokio::task::block_in_place(|| { - FRONTEND_RUNTIME.block_on(async { - let files = - list_s3_directory(op.clone(), eval_args[5].clone()).await?; - - Ok::, anyhow::Error>(files) - }) - })?; - - if files.is_empty() { - return Err(BindError( - "file_scan function only accepts non-empty directory".to_owned(), - ) - .into()); - } - - Some(files) + let (file_scan_backend, input_file_location) = + if "s3".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::S3, eval_args[5].clone()) + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::Gcs, eval_args[4].clone()) } else { - None + unreachable!(); }; - files - } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { - let (bucket, _) = extract_gcs_bucket_and_file_name(&eval_args[4].clone())?; - let op = new_gcs_operator( - eval_args[2].clone(), - eval_args[3].clone(), - bucket.clone(), - )?; - - let files = if eval_args[4].ends_with('/') { - let files = tokio::task::block_in_place(|| { - FRONTEND_RUNTIME.block_on(async { - let files = - list_gcs_directory(op.clone(), eval_args[4].clone()).await?; - - Ok::, anyhow::Error>(files) - }) - })?; - - if files.is_empty() { - return Err(BindError( - "file_scan function only accepts non-empty directory".to_owned(), + let (op, file_name) = match file_scan_backend { + FileScanBackend::S3 => { + let (bucket, file_name) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + ( + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )?, + file_name, + ) + } + FileScanBackend::Gcs => { + let (bucket, file_name) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + ( + new_gcs_operator( + eval_args[2].clone(), + eval_args[3].clone(), + bucket.clone(), + )?, + file_name, + ) + } + }; + let files = if input_file_location.ends_with('/') { + let files = tokio::task::block_in_place(|| { + FRONTEND_RUNTIME.block_on(async { + let files = list_data_directory( + op.clone(), + input_file_location.clone(), + &file_scan_backend, ) - .into()); - } + .await?; - Some(files) - } else { - None - }; - files + Ok::, anyhow::Error>(files) + }) + })?; + + if files.is_empty() { + return Err(BindError( + "file_scan function only accepts non-empty directory".to_owned(), + ) + .into()); + } + + Some(files) } else { - unreachable!() + None }; - let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { - let location = match files.as_ref() { - Some(files) => files[0].clone(), - None => { - if "s3".eq_ignore_ascii_case(&eval_args[1]) { - eval_args[5].clone() - } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { - eval_args[4].clone() - } else { - unreachable!() - } - } - }; - let (op, file_name) = if "s3".eq_ignore_ascii_case(&eval_args[1]) { - let (bucket, file_name) = extract_s3_bucket_and_file_name(&location)?; - ( - new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?, - file_name, - ) - } else { - let (bucket, file_name) = extract_gcs_bucket_and_file_name(&location)?; - ( - new_gcs_operator( - eval_args[2].clone(), - eval_args[3].clone(), - bucket.clone(), - )?, - file_name, - ) - }; - let fields = get_parquet_fields(op, file_name).await?; let mut rw_types = vec![]; From a056d7c366e36bcee41f89769ab3c8c90bafd0f4 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 31 Dec 2024 18:50:22 +0800 Subject: [PATCH 03/12] code refactor --- src/frontend/src/expr/table_function.rs | 48 ++++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index b97d89995cd80..f1c49e91bf954 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -159,31 +159,27 @@ impl TableFunction { } else { unreachable!(); }; - let (op, file_name) = match file_scan_backend { + let op = match file_scan_backend { FileScanBackend::S3 => { - let (bucket, file_name) = + let (bucket, _) = extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; - ( - new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?, - file_name, - ) + + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? } FileScanBackend::Gcs => { - let (bucket, file_name) = + let (bucket, _) = extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; - ( - new_gcs_operator( - eval_args[2].clone(), - eval_args[3].clone(), - bucket.clone(), - )?, - file_name, - ) + + new_gcs_operator( + eval_args[2].clone(), + eval_args[3].clone(), + bucket.clone(), + )? } }; let files = if input_file_location.ends_with('/') { @@ -213,6 +209,13 @@ impl TableFunction { }; let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { + let location = match files.as_ref() { + Some(files) => files[0].clone(), + None => input_file_location.clone(), + }; + let (_, file_name) = + extract_bucket_and_file_name(&location, &file_scan_backend)?; + let fields = get_parquet_fields(op, file_name).await?; let mut rw_types = vec![]; @@ -231,7 +234,10 @@ impl TableFunction { if let Some(files) = files { // if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments - args.remove(5); + match file_scan_backend { + FileScanBackend::S3 => args.remove(5), + FileScanBackend::Gcs => args.remove(4), + }; for file in files { args.push(ExprImpl::Literal(Box::new(Literal::new( Some(ScalarImpl::Utf8(file.into())), From ceefc56495761f953cf2ece0fb78c3abddf3459b Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 2 Jan 2025 15:40:44 +0800 Subject: [PATCH 04/12] support azblob file scan --- proto/batch_plan.proto | 17 +++ src/batch/executors/src/executor.rs | 3 + .../src/executor/azblob_file_scan.rs | 132 ++++++++++++++++++ .../source/iceberg/parquet_file_handler.rs | 26 +++- src/frontend/src/expr/table_function.rs | 25 +++- .../optimizer/plan_node/batch_file_scan.rs | 18 ++- .../optimizer/plan_node/generic/file_scan.rs | 58 +++++++- .../optimizer/plan_node/logical_file_scan.rs | 29 ++++ .../rule/table_function_to_file_scan_rule.rs | 20 +-- 9 files changed, 311 insertions(+), 17 deletions(-) create mode 100644 src/batch/executors/src/executor/azblob_file_scan.rs diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8869734a4fd95..8aa0ac431d616 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -89,6 +89,7 @@ message FileScanNode { STORAGE_TYPE_UNSPECIFIED = 0; S3 = 1; GCS = 2; + AZBLOB = 3; } repeated plan_common.ColumnDesc columns = 1; @@ -113,6 +114,21 @@ message GcsFileScanNode { repeated string file_location = 5; } +message AzblobFileScanNode { + enum FileFormat { + FILE_FORMAT_UNSPECIFIED = 0; + PARQUET = 1; + } + + repeated plan_common.ColumnDesc columns = 1; + FileFormat file_format = 2; + string account_name = 3; + string account_key = 4; + string endpoint = 5; + + repeated string file_location = 6; +} + // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. message PostgresQueryNode { repeated plan_common.ColumnDesc columns = 1; @@ -420,6 +436,7 @@ message PlanNode { PostgresQueryNode postgres_query = 40; MySqlQueryNode mysql_query = 41; GcsFileScanNode gcs_file_scan = 42; + AzblobFileScanNode azblob_file_scan = 43; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/executors/src/executor.rs b/src/batch/executors/src/executor.rs index dc5dbff8e2c80..26bc26b3c5300 100644 --- a/src/batch/executors/src/executor.rs +++ b/src/batch/executors/src/executor.rs @@ -15,6 +15,7 @@ pub use risingwave_batch::executor::*; pub mod aggregation; +mod azblob_file_scan; mod delete; mod expand; mod filter; @@ -50,6 +51,7 @@ mod update; mod utils; mod values; +use azblob_file_scan::AzblobFileScanExecutorBuilder; pub use delete::*; pub use expand::*; pub use filter::*; @@ -115,6 +117,7 @@ register_executor!(SortOverWindow, SortOverWindowExecutor); register_executor!(MaxOneRow, MaxOneRowExecutor); register_executor!(FileScan, FileScanExecutorBuilder); register_executor!(GcsFileScan, GcsFileScanExecutorBuilder); +register_executor!(AzblobFileScan, AzblobFileScanExecutorBuilder); register_executor!(IcebergScan, IcebergScanExecutorBuilder); register_executor!(PostgresQuery, PostgresQueryExecutorBuilder); register_executor!(MysqlQuery, MySqlQueryExecutorBuilder); diff --git a/src/batch/executors/src/executor/azblob_file_scan.rs b/src/batch/executors/src/executor/azblob_file_scan.rs new file mode 100644 index 0000000000000..f0c1e490a139e --- /dev/null +++ b/src/batch/executors/src/executor/azblob_file_scan.rs @@ -0,0 +1,132 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::DataChunk; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_connector::source::iceberg::{ + extract_bucket_and_file_name, new_azblob_operator, read_parquet_file, FileScanBackend, +}; +use risingwave_pb::batch_plan::file_scan_node; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; + +#[derive(PartialEq, Debug)] +pub enum FileFormat { + Parquet, +} + +/// Azblob file scan executor. Currently only support parquet file format. +pub struct AzblobFileScanExecutor { + file_format: FileFormat, + file_location: Vec, + account_name: String, + account_key: String, + endpoint: String, + batch_size: usize, + schema: Schema, + identity: String, +} + +impl Executor for AzblobFileScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl AzblobFileScanExecutor { + pub fn new( + file_format: FileFormat, + file_location: Vec, + account_name: String, + account_key: String, + endpoint: String, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + file_format, + file_location, + account_name, + account_key, + endpoint, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + assert_eq!(self.file_format, FileFormat::Parquet); + for file in self.file_location { + let (bucket, file_name) = + extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?; + let op = new_azblob_operator( + self.account_name.clone(), + self.account_key.clone(), + self.endpoint.clone(), + bucket.clone(), + )?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; + #[for_await] + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; + } + } + } +} + +pub struct AzblobFileScanExecutorBuilder {} + +impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, + _inputs: Vec, + ) -> crate::error::Result { + let file_scan_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::AzblobFileScan + )?; + + Ok(Box::new(AzblobFileScanExecutor::new( + match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { + file_scan_node::FileFormat::Parquet => FileFormat::Parquet, + file_scan_node::FileFormat::Unspecified => unreachable!(), + }, + file_scan_node.file_location.clone(), + file_scan_node.account_name.clone(), + file_scan_node.account_key.clone(), + file_scan_node.endpoint.clone(), + source.context().get_config().developer.chunk_size, + Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), + source.plan_node().get_identity().clone(), + ))) + } +} diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 3c70b3cab3266..1be6192238a4c 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -27,7 +27,7 @@ use iceberg::io::{ }; use iceberg::{Error, ErrorKind}; use itertools::Itertools; -use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::{layers::{LoggingLayer, RetryLayer}, services::Azblob}; use opendal::services::{Gcs, S3}; use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; @@ -145,10 +145,32 @@ pub fn new_gcs_operator( Ok(operator) } +pub fn new_azblob_operator( + account_name: String, + account_key: String, + endpoint: String, + container_name: String, +) -> ConnectorResult { + // Create gcs builder. + let mut builder = Azblob::default(); + builder = builder + .container(&container_name) + .endpoint(&endpoint) + .account_name(&account_name) + .account_key(&account_key); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) +} + #[derive(Debug, Clone)] pub enum FileScanBackend { S3, Gcs, + Azblob, } pub fn extract_bucket_and_file_name( @@ -168,6 +190,7 @@ pub fn extract_bucket_and_file_name( let prefix = match file_scan_backend { FileScanBackend::S3 => format!("s3://{}/", bucket), FileScanBackend::Gcs => format!("gcs://{}/", bucket), + FileScanBackend::Azblob => format!("azblob://{}/", bucket), }; let file_name = location[prefix.len()..].to_string(); Ok((bucket, file_name)) @@ -182,6 +205,7 @@ pub async fn list_data_directory( let prefix = match file_scan_backend { FileScanBackend::S3 => format!("s3://{}/", bucket), FileScanBackend::Gcs => format!("gcs://{}/", bucket), + FileScanBackend::Azblob => format!("azblob://{}/", bucket), }; if dir.starts_with(&prefix) { op.list(&file_name) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index f1c49e91bf954..09f76996d8c15 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,8 +21,7 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_gcs_operator, - new_s3_operator, FileScanBackend, + extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator, new_gcs_operator, new_s3_operator, FileScanBackend }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -135,10 +134,10 @@ impl TableFunction { } if !"s3".eq_ignore_ascii_case(&eval_args[1]) - && !"gcs".eq_ignore_ascii_case(&eval_args[1]) + && !"gcs".eq_ignore_ascii_case(&eval_args[1]) && !"azblob".eq_ignore_ascii_case(&eval_args[1]) { return Err(BindError( - "file_scan function only accepts 's3' or 'gcs' as storage type".to_owned(), + "file_scan function only accepts 's3', 'gcs' or 'azblob' as storage type".to_owned(), ) .into()); } @@ -156,7 +155,11 @@ impl TableFunction { (FileScanBackend::S3, eval_args[5].clone()) } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { (FileScanBackend::Gcs, eval_args[4].clone()) - } else { + } + else if "azblob".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::Azblob, eval_args[5].clone()) + } + else { unreachable!(); }; let op = match file_scan_backend { @@ -181,6 +184,17 @@ impl TableFunction { bucket.clone(), )? } + FileScanBackend::Azblob =>{ + let (bucket, _) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + + new_azblob_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + }, }; let files = if input_file_location.ends_with('/') { let files = tokio::task::block_in_place(|| { @@ -237,6 +251,7 @@ impl TableFunction { match file_scan_backend { FileScanBackend::S3 => args.remove(5), FileScanBackend::Gcs => args.remove(4), + FileScanBackend::Azblob => args.remove(5), }; for file in files { args.push(ExprImpl::Literal(Box::new(Literal::new( diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 2705fa584f1c3..1e43f9ca6aa44 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -13,7 +13,7 @@ // limitations under the License. use pretty_xmlish::XmlNode; -use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType}; +use risingwave_pb::batch_plan::{file_scan_node::{FileFormat, StorageType}, AzblobFileScanNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{FileScanNode, GcsFileScanNode}; @@ -109,6 +109,22 @@ impl ToBatchPb for BatchFileScan { file_location: gcs_file_scan.file_location.clone(), }) } + generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => { + NodeBody::AzblobFileScan(AzblobFileScanNode { + columns: azblob_file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match azblob_file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + account_name: azblob_file_scan.account_name.clone(), + account_key: azblob_file_scan.account_key.clone(), + endpoint: azblob_file_scan.endpoint.clone(), + file_location: azblob_file_scan.file_location.clone(), + }) + } } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index ce9643ed530ec..653e444636864 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -28,16 +28,18 @@ pub enum FileFormat { pub enum StorageType { S3, Gcs, + Azblob, } +#[allow(clippy::enum_variant_names)] #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub enum FileScanBackend { FileScan(FileScan), - GcsFileScan(GcsFileScan), // Assume GcsFileScan is defined elsewhere + GcsFileScan(GcsFileScan), + AzblobFileScan(AzblobFileScan), } -// Example definition of GcsFileScan (you may need to adjust it) #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct GcsFileScan { @@ -71,6 +73,40 @@ impl GenericPlanNode for GcsFileScan { } } +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct AzblobFileScan { + pub schema: Schema, + pub file_format: FileFormat, + pub storage_type: StorageType, + pub account_name: String, + pub account_key: String, + pub endpoint: String, + pub file_location: Vec, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for AzblobFileScan { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + impl FileScan { pub fn columns(&self) -> Vec { self.schema @@ -131,11 +167,25 @@ impl GcsFileScan { } } +impl AzblobFileScan { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } +} + impl GenericPlanNode for FileScanBackend { fn schema(&self) -> Schema { match self { FileScanBackend::FileScan(file_scan) => file_scan.schema(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(), + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.schema(), } } @@ -143,6 +193,7 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.stream_key(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(), + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.stream_key(), } } @@ -150,6 +201,7 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.ctx(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(), + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.ctx(), } } @@ -157,6 +209,7 @@ impl GenericPlanNode for FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(), + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.functional_dependency(), } } } @@ -166,6 +219,7 @@ impl FileScanBackend { match self { FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(), FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(), + FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.file_location.clone(), } } } diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index a56241d686adb..8407b05157cd6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -94,6 +94,35 @@ impl LogicalFileScan { LogicalFileScan { base, core } } + pub fn new_azblob_logical_file_scan( + ctx: OptimizerContextRef, + schema: Schema, + file_format: String, + storage_type: String, + account_name: String, + account_key: String, + endpoint: String, + file_location: Vec, + ) -> Self { + assert!("parquet".eq_ignore_ascii_case(&file_format)); + assert!("azblob".eq_ignore_ascii_case(&storage_type)); + + let core = generic::FileScanBackend::AzblobFileScan(generic::AzblobFileScan { + schema, + file_format: generic::FileFormat::Parquet, + storage_type: generic::StorageType::Azblob, + account_name, + account_key, + endpoint, + file_location, + ctx, + }); + + let base = PlanBase::new_logical_with_core(&core); + + LogicalFileScan { base, core } + } + } impl_plan_tree_node_for_leaf! {LogicalFileScan} diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index 6956d83f035a9..40d3ff0d47c0d 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -60,6 +60,7 @@ impl Rule for TableFunctionToFileScanRule { assert!( ("s3".eq_ignore_ascii_case(&eval_args[1])) || "gcs".eq_ignore_ascii_case(&eval_args[1]) + || "azblob".eq_ignore_ascii_case(&eval_args[1]) ); if "s3".eq_ignore_ascii_case(&eval_args[1]) { @@ -81,19 +82,22 @@ impl Rule for TableFunctionToFileScanRule { ) .into(), ) - } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { - let creditial = eval_args[2].clone(); - let service_account = eval_args[3].clone(); + } + else if "azblob".eq_ignore_ascii_case(&eval_args[1]) { + let account_name = eval_args[2].clone(); + let account_key = eval_args[3].clone(); + let endpoint = eval_args[4].clone(); // The rest of the arguments are file locations - let file_location = eval_args[4..].iter().cloned().collect_vec(); + let file_location = eval_args[5..].iter().cloned().collect_vec(); Some( - LogicalFileScan::new_gcs_logical_file_scan( + LogicalFileScan::new_azblob_logical_file_scan( logical_table_function.ctx(), schema, "parquet".to_owned(), - "gcs".to_owned(), - creditial, - service_account, + "azblob".to_owned(), + account_name, + account_key, + endpoint, file_location, ) .into(), From bf78a288348b9f3caff479f79b650e0c8d4d1d3c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 2 Jan 2025 16:44:11 +0800 Subject: [PATCH 05/12] minor refactor --- src/frontend/src/expr/table_function.rs | 10 ++++------ .../src/optimizer/plan_node/batch_file_scan.rs | 2 +- .../src/optimizer/plan_node/generic/file_scan.rs | 5 ++--- .../src/optimizer/plan_node/logical_file_scan.rs | 4 ++-- .../optimizer/rule/table_function_to_file_scan_rule.rs | 4 ++-- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index f1c49e91bf954..8327fce055047 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -80,13 +80,11 @@ impl TableFunction { let return_type = { // arguments: // file format e.g. parquet - // storage type e.g. s3 - // s3 region - // s3 access key - // s3 secret key - // file location + // storage type e.g. s3, gcs + // For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory) + // For gcs: file_scan(file_format, gcs, credential, service_account, file_location_or_directory) if args.len() != 6 && args.len() != 5 { - return Err(BindError("file_scan function only accepts 6 arguments: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location)".to_owned()).into()); + return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into()); } let mut eval_args: Vec = vec![]; for arg in &args { diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 2705fa584f1c3..f3aa27a7af945 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -104,7 +104,7 @@ impl ToBatchPb for BatchFileScan { file_format: match gcs_file_scan.file_format { generic::FileFormat::Parquet => FileFormat::Parquet as i32, }, - credential: gcs_file_scan.creditial.clone(), + credential: gcs_file_scan.credential.clone(), service_account: gcs_file_scan.service_account.clone(), file_location: gcs_file_scan.file_location.clone(), }) diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index ce9643ed530ec..cca2f77116368 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -34,17 +34,16 @@ pub enum StorageType { #[educe(PartialEq, Eq, Hash)] pub enum FileScanBackend { FileScan(FileScan), - GcsFileScan(GcsFileScan), // Assume GcsFileScan is defined elsewhere + GcsFileScan(GcsFileScan), } -// Example definition of GcsFileScan (you may need to adjust it) #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct GcsFileScan { pub schema: Schema, pub file_format: FileFormat, pub storage_type: StorageType, - pub creditial: String, + pub credential: String, pub service_account: String, pub file_location: Vec, diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index a56241d686adb..ea73677171ea1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -73,7 +73,7 @@ impl LogicalFileScan { schema: Schema, file_format: String, storage_type: String, - creditial: String, + credential: String, service_account: String, file_location: Vec, ) -> Self { @@ -84,7 +84,7 @@ impl LogicalFileScan { schema, file_format: generic::FileFormat::Parquet, storage_type: generic::StorageType::Gcs, - creditial, + credential, service_account, file_location, ctx, diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index 6956d83f035a9..de5a94e9e3a59 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -82,7 +82,7 @@ impl Rule for TableFunctionToFileScanRule { .into(), ) } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { - let creditial = eval_args[2].clone(); + let credential = eval_args[2].clone(); let service_account = eval_args[3].clone(); // The rest of the arguments are file locations let file_location = eval_args[4..].iter().cloned().collect_vec(); @@ -92,7 +92,7 @@ impl Rule for TableFunctionToFileScanRule { schema, "parquet".to_owned(), "gcs".to_owned(), - creditial, + credential, service_account, file_location, ) From 092ddf29ef32bb314d91476499352c5ebdffbd76 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 6 Jan 2025 13:52:25 +0800 Subject: [PATCH 06/12] remove service account --- proto/batch_plan.proto | 3 +-- src/batch/executors/src/executor/gcs_file_scan.rs | 10 +--------- .../src/source/iceberg/parquet_file_handler.rs | 11 ++--------- src/frontend/src/expr/table_function.rs | 12 ++++-------- .../src/optimizer/plan_node/batch_file_scan.rs | 1 - .../src/optimizer/plan_node/generic/file_scan.rs | 1 - .../src/optimizer/plan_node/logical_file_scan.rs | 2 -- .../rule/table_function_to_file_scan_rule.rs | 4 +--- 8 files changed, 9 insertions(+), 35 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8869734a4fd95..4540fde03204e 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -109,8 +109,7 @@ message GcsFileScanNode { repeated plan_common.ColumnDesc columns = 1; FileFormat file_format = 2; string credential = 3; - string service_account = 4; - repeated string file_location = 5; + repeated string file_location = 4; } // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. diff --git a/src/batch/executors/src/executor/gcs_file_scan.rs b/src/batch/executors/src/executor/gcs_file_scan.rs index df59b4e8241f3..d96f975180200 100644 --- a/src/batch/executors/src/executor/gcs_file_scan.rs +++ b/src/batch/executors/src/executor/gcs_file_scan.rs @@ -35,7 +35,6 @@ pub struct GcsFileScanExecutor { file_format: FileFormat, file_location: Vec, gcs_credential: String, - service_account: String, batch_size: usize, schema: Schema, identity: String, @@ -60,7 +59,6 @@ impl GcsFileScanExecutor { file_format: FileFormat, file_location: Vec, gcs_credential: String, - service_account: String, batch_size: usize, schema: Schema, identity: String, @@ -69,7 +67,6 @@ impl GcsFileScanExecutor { file_format, file_location, gcs_credential, - service_account, batch_size, schema, identity, @@ -81,11 +78,7 @@ impl GcsFileScanExecutor { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?; - let op = new_gcs_operator( - self.gcs_credential.clone(), - self.service_account.clone(), - bucket.clone(), - )?; + let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?; let chunk_stream = read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; #[for_await] @@ -117,7 +110,6 @@ impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder { }, file_scan_node.file_location.clone(), file_scan_node.credential.clone(), - file_scan_node.service_account.clone(), source.context().get_config().developer.chunk_size, Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), source.plan_node().get_identity().clone(), diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 3334772673d9f..2cae369aa6a22 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -127,16 +127,9 @@ pub fn new_s3_operator( Ok(op) } -pub fn new_gcs_operator( - credential: String, - service_account: String, - bucket: String, -) -> ConnectorResult { +pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult { // Create gcs builder. - let builder = Gcs::default() - .bucket(&bucket) - .credential(&credential) - .service_account(&service_account); + let builder = Gcs::default().bucket(&bucket).credential(&credential); let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 0c53fb446ce63..59d3a981cf332 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -83,7 +83,7 @@ impl TableFunction { // storage type e.g. s3, gcs // For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory) // For gcs: file_scan(file_format, gcs, credential, service_account, file_location_or_directory) - if args.len() != 6 && args.len() != 5 { + if args.len() != 6 && args.len() != 4 { return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into()); } let mut eval_args: Vec = vec![]; @@ -153,7 +153,7 @@ impl TableFunction { if "s3".eq_ignore_ascii_case(&eval_args[1]) { (FileScanBackend::S3, eval_args[5].clone()) } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { - (FileScanBackend::Gcs, eval_args[4].clone()) + (FileScanBackend::Gcs, eval_args[3].clone()) } else { unreachable!(); }; @@ -173,11 +173,7 @@ impl TableFunction { let (bucket, _) = extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; - new_gcs_operator( - eval_args[2].clone(), - eval_args[3].clone(), - bucket.clone(), - )? + new_gcs_operator(eval_args[2].clone(), bucket.clone())? } }; let files = if input_file_location.ends_with('/') { @@ -234,7 +230,7 @@ impl TableFunction { // if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments match file_scan_backend { FileScanBackend::S3 => args.remove(5), - FileScanBackend::Gcs => args.remove(4), + FileScanBackend::Gcs => args.remove(3), }; for file in files { args.push(ExprImpl::Literal(Box::new(Literal::new( diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 7e84a427512e3..0de65f4bd8555 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -105,7 +105,6 @@ impl ToBatchPb for BatchFileScan { generic::FileFormat::Parquet => FileFormat::Parquet as i32, }, credential: gcs_file_scan.credential.clone(), - service_account: gcs_file_scan.service_account.clone(), file_location: gcs_file_scan.file_location.clone(), }) } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 1c573a8cca404..95ea6e1f4fcce 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -44,7 +44,6 @@ pub struct GcsFileScan { pub file_format: FileFormat, pub storage_type: StorageType, pub credential: String, - pub service_account: String, pub file_location: Vec, #[educe(PartialEq(ignore))] diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index cf7e731a72c20..429e1974d8e84 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -74,7 +74,6 @@ impl LogicalFileScan { file_format: String, storage_type: String, credential: String, - service_account: String, file_location: Vec, ) -> Self { assert!("parquet".eq_ignore_ascii_case(&file_format)); @@ -85,7 +84,6 @@ impl LogicalFileScan { file_format: generic::FileFormat::Parquet, storage_type: generic::StorageType::Gcs, credential, - service_account, file_location, ctx, }); diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index 6826e1af45970..1c081796bbe22 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -83,9 +83,8 @@ impl Rule for TableFunctionToFileScanRule { ) } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { let credential = eval_args[2].clone(); - let service_account = eval_args[3].clone(); // The rest of the arguments are file locations - let file_location = eval_args[4..].iter().cloned().collect_vec(); + let file_location = eval_args[3..].iter().cloned().collect_vec(); Some( LogicalFileScan::new_gcs_logical_file_scan( logical_table_function.ctx(), @@ -93,7 +92,6 @@ impl Rule for TableFunctionToFileScanRule { "parquet".to_owned(), "gcs".to_owned(), credential, - service_account, file_location, ) .into(), From 2f9b1d959a45fce9805939af2950710a2f2bf5eb Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 6 Jan 2025 14:00:17 +0800 Subject: [PATCH 07/12] update License header --- src/batch/executors/src/executor/gcs_file_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/batch/executors/src/executor/gcs_file_scan.rs b/src/batch/executors/src/executor/gcs_file_scan.rs index d96f975180200..aa0e9c6d4a0b0 100644 --- a/src/batch/executors/src/executor/gcs_file_scan.rs +++ b/src/batch/executors/src/executor/gcs_file_scan.rs @@ -1,4 +1,4 @@ -// Copyright 2024 RisingWave Labs +// Copyright 2025 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 6d790648e58db94b1284e8411da24bd7bca3191e Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 7 Jan 2025 15:38:03 +0800 Subject: [PATCH 08/12] update License header --- src/batch/executors/src/executor/azblob_file_scan.rs | 2 +- src/connector/src/source/iceberg/parquet_file_handler.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/batch/executors/src/executor/azblob_file_scan.rs b/src/batch/executors/src/executor/azblob_file_scan.rs index f0c1e490a139e..aa6561f61f233 100644 --- a/src/batch/executors/src/executor/azblob_file_scan.rs +++ b/src/batch/executors/src/executor/azblob_file_scan.rs @@ -1,4 +1,4 @@ -// Copyright 2024 RisingWave Labs +// Copyright 2025 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index df1e1a62b5b2b..0ccf2eb7a6989 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -144,7 +144,7 @@ pub fn new_azblob_operator( endpoint: String, container_name: String, ) -> ConnectorResult { - // Create gcs builder. + // Create azblob builder. let mut builder = Azblob::default(); builder = builder .container(&container_name) From ea663af7c149543597137e52d2af0b5a5093cae5 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 7 Jan 2025 15:59:24 +0800 Subject: [PATCH 09/12] minor --- src/frontend/src/expr/table_function.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 8888e54f788d6..54ecdf92b871a 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -80,11 +80,15 @@ impl TableFunction { let return_type = { // arguments: // file format e.g. parquet - // storage type e.g. s3, gcs - // For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory) - // For gcs: file_scan(file_format, gcs, credential, file_location_or_directory) + // storage type e.g. s3, gcs, azblob + // For s3: file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location_or_directory) + // For gcs: file_scan('parquet', 'gcs', credential, file_location_or_directory) + // For azblob: file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location) if args.len() != 6 && args.len() != 4 { - return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into()); + return Err(BindError("file_scan function only accepts: \ + file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) or \ + file_scan('parquet', 'gcs', credential, service_account, file_location) or \ + file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location)".to_owned()).into()); } let mut eval_args: Vec = vec![]; for arg in &args { From 531e1304c38dba6ca0cb4c0f969283e490aba69f Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 7 Jan 2025 18:00:20 +0800 Subject: [PATCH 10/12] revert change in Cargo.lock --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c396cb52dfa6b..446b640d2ccdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8864,18 +8864,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", From 14ef860806a57124f59f2c655db1cbffa51d4789 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 9 Jan 2025 11:35:42 +0800 Subject: [PATCH 11/12] more strict table function parameter length judgment --- src/frontend/src/expr/table_function.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 54ecdf92b871a..b67d14b0347d0 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -84,12 +84,6 @@ impl TableFunction { // For s3: file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location_or_directory) // For gcs: file_scan('parquet', 'gcs', credential, file_location_or_directory) // For azblob: file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location) - if args.len() != 6 && args.len() != 4 { - return Err(BindError("file_scan function only accepts: \ - file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) or \ - file_scan('parquet', 'gcs', credential, service_account, file_location) or \ - file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location)".to_owned()).into()); - } let mut eval_args: Vec = vec![]; for arg in &args { if arg.return_type() != DataType::Varchar { @@ -129,6 +123,22 @@ impl TableFunction { } } } + + if (eval_args.len() != 4 && eval_args.len() != 6) + || (eval_args.len() == 4 && !"gcs".eq_ignore_ascii_case(&eval_args[1])) + || (eval_args.len() == 6 + && !"s3".eq_ignore_ascii_case(&eval_args[1]) + && !"azblob".eq_ignore_ascii_case(&eval_args[1])) + { + return Err(BindError( + "file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows:: \n + file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) \n + file_scan('parquet', 'gcs', credential, service_account, file_location) \n + file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location)" + .to_owned(), + ) + .into()); + } if !"parquet".eq_ignore_ascii_case(&eval_args[0]) { return Err(BindError( "file_scan function only accepts 'parquet' as file format".to_owned(), From 0cc006d44610d6a3967421a16ae89c3676b867d0 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 9 Jan 2025 12:57:20 +0800 Subject: [PATCH 12/12] fix typr --- src/frontend/src/expr/table_function.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 3e1585f97b17a..d9577c1270f2e 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -131,7 +131,7 @@ impl TableFunction { && !"azblob".eq_ignore_ascii_case(&eval_args[1])) { return Err(BindError( - "file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows:: \n + "file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows: \n file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) \n file_scan('parquet', 'gcs', credential, service_account, file_location) \n file_scan('parquet', 'azblob', account_name, account_key, endpoint, file_location)"