Skip to content

Commit

Permalink
Save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent a26575e commit abf6d5e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 10 deletions.
98 changes: 91 additions & 7 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

//! Execution plan for reading CSV files
use std::fs;
use std::fs::File;
use std::sync::Arc;

use crate::error::Result;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::datatypes::Schema;
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
Expand All @@ -38,20 +42,44 @@ impl ExecutionPlan for CsvExec {

/// Get the partitions for this execution plan. Each partition can be executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<Partition>>> {
// TODO get list of files in the directory and create a partition for each one
unimplemented!()
let mut filenames: Vec<String> = vec![];
self.build_file_list(&self.path, &mut filenames)?;
let partitions = filenames
.iter()
.map(|filename| {
Arc::new(CsvPartition::new(&filename, self.schema.clone()))
as Arc<Partition>
})
.collect();
Ok(partitions)
}
}

impl CsvExec {
/// Create a new execution plan for reading a set of CSV files
pub fn try_new(path: &str, schema: Arc<Schema>) -> Result<Self> {
//TODO
Ok(Self {
path: path.to_string(),
schema: schema.clone(),
})
}

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let path_name = path.as_os_str().to_str().unwrap();
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
}
Ok(())
}
}

/// CSV Partition
Expand All @@ -62,19 +90,75 @@ struct CsvPartition {
schema: Arc<Schema>,
}

impl CsvPartition {
fn new(path: &str, schema: Arc<Schema>) -> Self {
Self {
path: path.to_string(),
schema,
}
}
}

impl Partition for CsvPartition {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<dyn BatchIterator>> {
unimplemented!()
Ok(Arc::new(CsvIterator::try_new(
&self.path,
self.schema.clone(),
true, //TODO: do not hard-code
&None, //TODO: do not hard-code
1024,
)?)) //TODO: do not hard-code
}
}

/// Iterator over batches
struct CsvIterator {}
struct CsvIterator {
/// Schema for the batches produced by this iterator
schema: Arc<Schema>,
/// Arrow CSV reader
reader: csv::Reader<File>,
}

impl CsvIterator {
/// Create an iterator for a CSV file
pub fn try_new(
filename: &str,
schema: Arc<Schema>,
has_header: bool,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let file = File::open(filename)?;
let reader = csv::Reader::new(
file,
schema.clone(),
has_header,
batch_size,
projection.clone(),
);

let projected_schema = match projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => schema,
};

Ok(Self {
schema: projected_schema,
reader,
})
}
}

impl BatchIterator for CsvIterator {
/// Get the next RecordBatch
fn next(&self) -> Result<Option<RecordBatch>> {
//Ok(self.reader.next()?)
unimplemented!()
}
}
7 changes: 4 additions & 3 deletions rust/datafusion/src/execution/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ mod tests {

let path = format!("{}/tbd", testdata);

//TODO: working on this now ..
let csv = CsvExec::try_new(&path, schema)?;

let projection =
ProjectionExec::try_new(vec![], Arc::new(CsvExec::try_new(&path, schema)?))?;
let projection = ProjectionExec::try_new(vec![], Arc::new(csv))?;

//TODO assertions

Ok(())
}
Expand Down

0 comments on commit abf6d5e

Please sign in to comment.