Skip to content

Commit

Permalink
csv provider now supports partitioned csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 7, 2019
1 parent 20216bc commit 104f3ad
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
19 changes: 13 additions & 6 deletions rust/datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
//! CSV Data source
use std::fs::File;
use std::string::String;
use std::sync::{Arc, Mutex};

use std::string::String;
use std::sync::Arc;
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;

use crate::datasource::{ScanResult, TableProvider};
use crate::error::Result;
use crate::execution::physical_plan::BatchIterator;
use crate::execution::physical_plan::csv::CsvExec;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan};

/// Represents a CSV file with a provided schema
// TODO: usage example (rather than documenting `new()`)
Expand Down Expand Up @@ -58,13 +59,19 @@ impl TableProvider for CsvFile {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new(
let exec = CsvExec::try_new(
&self.filename,
self.schema.clone(),
self.has_header,
projection,
projection.clone(),
batch_size,
)))])
)?;
let partitions = exec.partitions()?;
let iterators = partitions
.iter()
.map(|p| p.execute())
.collect::<Result<Vec<_>>>()?;
Ok(iterators)
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ impl ExecutionContext {
let mut combined_results: Vec<RecordBatch> = vec![];
for thread in threads {
let result = thread.join().unwrap();
let result = result.unwrap();
let result = result?;
result
.iter()
.for_each(|batch| combined_results.push(batch.clone()));
Expand Down Expand Up @@ -534,7 +534,7 @@ mod tests {
let mut file = File::create(file_path)?;

// generate some data
for i in 0..10 {
for i in 0..=10 {
let data = format!("{},{}\n", partition, i);
file.write_all(data.as_bytes())?;
}
Expand Down
30 changes: 19 additions & 11 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::fs;
use std::fs::File;
use std::fs::metadata;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
Expand Down Expand Up @@ -97,19 +98,26 @@ impl CsvExec {

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(".csv") {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
Ok(())
Expand Down

0 comments on commit 104f3ad

Please sign in to comment.