diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 85db60c899a98..6447d8b48cad5 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -37,23 +37,27 @@ use parquet::file::reader::*; use crate::datasource::{ScanResult, TableProvider}; use crate::error::{ExecutionError, Result}; +use crate::execution::physical_plan::common; use crate::execution::physical_plan::BatchIterator; /// Table-based representation of a `ParquetFile` pub struct ParquetTable { - filename: String, + filenames: Vec, schema: Arc, } impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path - pub fn try_new(filename: &str) -> Result { - let parquet_file = ParquetFile::open(filename, None, 0)?; - let schema = parquet_file.projection_schema.clone(); - Ok(Self { - filename: filename.to_string(), - schema, - }) + pub fn try_new(path: &str) -> Result { + let mut filenames: Vec = vec![]; + common::build_file_list(path, &mut filenames, ".parquet")?; + if filenames.is_empty() { + Err(ExecutionError::General("No files found".to_string())) + } else { + let parquet_file = ParquetFile::open(&filenames[0], None, 0)?; + let schema = parquet_file.projection_schema.clone(); + Ok(Self { filenames, schema }) + } } } @@ -70,17 +74,16 @@ impl TableProvider for ParquetTable { projection: &Option>, batch_size: usize, ) -> Result> { - // note that this code currently assumes the filename is a file rather than a directory - // and therefore only returns a single partition - let parquet_file = match projection { - Some(p) => ParquetScanPartition::try_new( - &self.filename, - Some(p.clone()), - batch_size, - )?, - None => ParquetScanPartition::try_new(&self.filename, None, batch_size)?, - }; - Ok(vec![Arc::new(Mutex::new(parquet_file))]) + Ok(self + .filenames + .iter() + .map(|filename| { + ParquetScanPartition::try_new(filename, projection.clone(), batch_size) + .and_then(|part| { + Ok(Arc::new(Mutex::new(part)) as Arc>) + }) + }) + .collect::>>()?) } } diff --git a/rust/datafusion/src/execution/physical_plan/common.rs b/rust/datafusion/src/execution/physical_plan/common.rs index e6ba826f0e982..60872b06e472a 100644 --- a/rust/datafusion/src/execution/physical_plan/common.rs +++ b/rust/datafusion/src/execution/physical_plan/common.rs @@ -17,9 +17,11 @@ //! Defines common code used in execution plans +use std::fs; +use std::fs::metadata; use std::sync::{Arc, Mutex}; -use crate::error::Result; +use crate::error::{ExecutionError, Result}; use crate::execution::physical_plan::BatchIterator; use arrow::datatypes::Schema; @@ -75,3 +77,30 @@ pub fn collect(it: Arc>) -> Result> { } } } + +/// Recursively build a list of files in a directory with a given extension +pub fn build_file_list(dir: &str, filenames: &mut Vec, ext: &str) -> Result<()> { + let metadata = metadata(dir)?; + if metadata.is_file() { + if dir.ends_with(ext) { + filenames.push(dir.to_string()); + } + } else { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + build_file_list(path_name, filenames, ext)?; + } else { + if path_name.ends_with(ext) { + filenames.push(path_name.to_string()); + } + } + } else { + return Err(ExecutionError::General("Invalid path".to_string())); + } + } + } + Ok(()) +} diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index 306718fe5c46e..a07417cf22e03 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -17,12 +17,11 @@ //! Execution plan for reading CSV files -use std::fs; -use std::fs::metadata; use std::fs::File; use std::sync::{Arc, Mutex}; -use crate::error::{ExecutionError, Result}; +use crate::error::Result; +use crate::execution::physical_plan::common; use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; use arrow::csv; use arrow::datatypes::Schema; @@ -51,7 +50,7 @@ impl ExecutionPlan for CsvExec { /// Get the partitions for this execution plan. Each partition can be executed in parallel. fn partitions(&self) -> Result>> { let mut filenames: Vec = vec![]; - self.build_file_list(&self.path, &mut filenames)?; + common::build_file_list(&self.path, &mut filenames, ".csv")?; let partitions = filenames .iter() .map(|filename| { @@ -85,33 +84,6 @@ impl CsvExec { batch_size, }) } - - /// Recursively build a list of csv files in a directory - fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { - let metadata = metadata(dir)?; - if metadata.is_file() { - if dir.ends_with(".csv") { - filenames.push(dir.to_string()); - } - } else { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - self.build_file_list(path_name, filenames)?; - } else { - if path_name.ends_with(".csv") { - filenames.push(path_name.to_string()); - } - } - } else { - return Err(ExecutionError::General("Invalid path".to_string())); - } - } - } - Ok(()) - } } /// CSV Partition