From 104f3ad9b04ac1c5e2da93274fe41dd594864747 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 13:04:14 -0600 Subject: [PATCH] csv provider now supports partitioned csv files --- rust/datafusion/src/datasource/csv.rs | 19 ++++++++---- rust/datafusion/src/execution/context.rs | 4 +-- .../src/execution/physical_plan/csv.rs | 30 ++++++++++++------- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index c8f128d4ffbed..4690b23610b28 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -18,16 +18,17 @@ //! CSV Data source use std::fs::File; -use std::string::String; -use std::sync::{Arc, Mutex}; +use std::string::String; +use std::sync::Arc; use arrow::csv; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use crate::datasource::{ScanResult, TableProvider}; use crate::error::Result; -use crate::execution::physical_plan::BatchIterator; +use crate::execution::physical_plan::csv::CsvExec; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan}; /// Represents a CSV file with a provided schema // TODO: usage example (rather than documenting `new()`) @@ -58,13 +59,19 @@ impl TableProvider for CsvFile { projection: &Option>, batch_size: usize, ) -> Result> { - Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new( + let exec = CsvExec::try_new( &self.filename, self.schema.clone(), self.has_header, - projection, + projection.clone(), batch_size, - )))]) + )?; + let partitions = exec.partitions()?; + let iterators = partitions + .iter() + .map(|p| p.execute()) + .collect::>>()?; + Ok(iterators) } } diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index fc1ec1a4057b8..db3e4ff91e3d7 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -304,7 +304,7 @@ impl ExecutionContext { let mut combined_results: Vec = vec![]; for thread in threads { let result = thread.join().unwrap(); - let result = result.unwrap(); + let result = result?; result .iter() .for_each(|batch| combined_results.push(batch.clone())); @@ -534,7 +534,7 @@ mod tests { let mut file = File::create(file_path)?; // generate some data - for i in 0..10 { + for i in 0..=10 { let data = format!("{},{}\n", partition, i); file.write_all(data.as_bytes())?; } diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index 82285b0f7da05..eeb7b7462030a 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -19,6 +19,7 @@ use std::fs; use std::fs::File; +use std::fs::metadata; use std::sync::{Arc, Mutex}; use crate::error::{ExecutionError, Result}; @@ -97,19 +98,26 @@ impl CsvExec { /// Recursively build a list of csv files in a directory fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - self.build_file_list(path_name, filenames)?; - } else { - if path_name.ends_with(".csv") { - filenames.push(path_name.to_string()); + let metadata = metadata(dir)?; + if metadata.is_file() { + if dir.ends_with(".csv") { + filenames.push(dir.to_string()); + } + } else { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + self.build_file_list(path_name, filenames)?; + } else { + if path_name.ends_with(".csv") { + filenames.push(path_name.to_string()); + } } + } else { + return Err(ExecutionError::General("Invalid path".to_string())); } - } else { - return Err(ExecutionError::General("Invalid path".to_string())); } } Ok(())