From abf6d5eb6bb5ff84fca8ffcf06a28f28ebd9da8b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 1 Aug 2019 12:01:19 -0600 Subject: [PATCH] Save --- .../src/execution/physical_plan/csv.rs | 98 +++++++++++++++++-- .../src/execution/physical_plan/projection.rs | 7 +- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index da294bfa6a245..af70b99c887a4 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -17,11 +17,15 @@ //! Execution plan for reading CSV files +use std::fs; +use std::fs::File; +use std::sync::Arc; + use crate::error::Result; use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; -use arrow::datatypes::Schema; +use arrow::csv; +use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; -use std::sync::Arc; pub struct CsvExec { /// Path to directory containing partitioned CSV files with the same schema @@ -38,20 +42,44 @@ impl ExecutionPlan for CsvExec { /// Get the partitions for this execution plan. Each partition can be executed in parallel. fn partitions(&self) -> Result>> { - // TODO get list of files in the directory and create a partition for each one - unimplemented!() + let mut filenames: Vec = vec![]; + self.build_file_list(&self.path, &mut filenames)?; + let partitions = filenames + .iter() + .map(|filename| { + Arc::new(CsvPartition::new(&filename, self.schema.clone())) + as Arc + }) + .collect(); + Ok(partitions) } } impl CsvExec { /// Create a new execution plan for reading a set of CSV files pub fn try_new(path: &str, schema: Arc) -> Result { - //TODO Ok(Self { path: path.to_string(), schema: schema.clone(), }) } + + /// Recursively build a list of csv files in a directory + fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let path_name = path.as_os_str().to_str().unwrap(); + if path.is_dir() { + self.build_file_list(path_name, filenames)?; + } else { + if path_name.ends_with(".csv") { + filenames.push(path_name.to_string()); + } + } + } + Ok(()) + } } /// CSV Partition @@ -62,19 +90,75 @@ struct CsvPartition { schema: Arc, } +impl CsvPartition { + fn new(path: &str, schema: Arc) -> Self { + Self { + path: path.to_string(), + schema, + } + } +} + impl Partition for CsvPartition { /// Execute this partition and return an iterator over RecordBatch fn execute(&self) -> Result> { - unimplemented!() + Ok(Arc::new(CsvIterator::try_new( + &self.path, + self.schema.clone(), + true, //TODO: do not hard-code + &None, //TODO: do not hard-code + 1024, + )?)) //TODO: do not hard-code } } /// Iterator over batches -struct CsvIterator {} +struct CsvIterator { + /// Schema for the batches produced by this iterator + schema: Arc, + /// Arrow CSV reader + reader: csv::Reader, +} + +impl CsvIterator { + /// Create an iterator for a CSV file + pub fn try_new( + filename: &str, + schema: Arc, + has_header: bool, + projection: &Option>, + batch_size: usize, + ) -> Result { + let file = File::open(filename)?; + let reader = csv::Reader::new( + file, + schema.clone(), + has_header, + batch_size, + projection.clone(), + ); + + let projected_schema = match projection { + Some(p) => { + let projected_fields: Vec = + p.iter().map(|i| schema.fields()[*i].clone()).collect(); + + Arc::new(Schema::new(projected_fields)) + } + None => schema, + }; + + Ok(Self { + schema: projected_schema, + reader, + }) + } +} impl BatchIterator for CsvIterator { /// Get the next RecordBatch fn next(&self) -> Result> { + //Ok(self.reader.next()?) unimplemented!() } } diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs index 62582d1945419..b43bd61527716 100644 --- a/rust/datafusion/src/execution/physical_plan/projection.rs +++ b/rust/datafusion/src/execution/physical_plan/projection.rs @@ -159,10 +159,11 @@ mod tests { let path = format!("{}/tbd", testdata); - //TODO: working on this now .. + let csv = CsvExec::try_new(&path, schema)?; - let projection = - ProjectionExec::try_new(vec![], Arc::new(CsvExec::try_new(&path, schema)?))?; + let projection = ProjectionExec::try_new(vec![], Arc::new(csv))?; + + //TODO assertions Ok(()) }