Skip to content

Commit

Permalink
ARROW-6086: [Rust] [DataFusion] Add support for partitioned Parquet d…
Browse files Browse the repository at this point in the history
…ata sources

I discovered this last minute while running manual tests. I have been able to run parallel queries against parquet files using this branch as a dependency.

Closes apache#5494 from andygrove/ARROW-6086 and squashes the following commits:

77bee15 <Andy Grove> Replace unwrap with Result combinator
cd11b97 <Andy Grove> don't panic
c751753 <Andy Grove> Add support for partitioned parquet data sources
25eaf45 <Andy Grove> Move build_file_list into common module

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored and alippai committed Sep 26, 2019
1 parent 8640533 commit 95f7ec6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 51 deletions.
41 changes: 22 additions & 19 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,27 @@ use parquet::file::reader::*;

use crate::datasource::{ScanResult, TableProvider};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::BatchIterator;

/// Table-based representation of a `ParquetFile`
pub struct ParquetTable {
filename: String,
filenames: Vec<String>,
schema: Arc<Schema>,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path
pub fn try_new(filename: &str) -> Result<Self> {
let parquet_file = ParquetFile::open(filename, None, 0)?;
let schema = parquet_file.projection_schema.clone();
Ok(Self {
filename: filename.to_string(),
schema,
})
pub fn try_new(path: &str) -> Result<Self> {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
Err(ExecutionError::General("No files found".to_string()))
} else {
let parquet_file = ParquetFile::open(&filenames[0], None, 0)?;
let schema = parquet_file.projection_schema.clone();
Ok(Self { filenames, schema })
}
}
}

Expand All @@ -70,17 +74,16 @@ impl TableProvider for ParquetTable {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
// note that this code currently assumes the filename is a file rather than a directory
// and therefore only returns a single partition
let parquet_file = match projection {
Some(p) => ParquetScanPartition::try_new(
&self.filename,
Some(p.clone()),
batch_size,
)?,
None => ParquetScanPartition::try_new(&self.filename, None, batch_size)?,
};
Ok(vec![Arc::new(Mutex::new(parquet_file))])
Ok(self
.filenames
.iter()
.map(|filename| {
ParquetScanPartition::try_new(filename, projection.clone(), batch_size)
.and_then(|part| {
Ok(Arc::new(Mutex::new(part)) as Arc<Mutex<dyn BatchIterator>>)
})
})
.collect::<Result<Vec<_>>>()?)
}
}

Expand Down
31 changes: 30 additions & 1 deletion rust/datafusion/src/execution/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

//! Defines common code used in execution plans
use std::fs;
use std::fs::metadata;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::BatchIterator;

use arrow::datatypes::Schema;
Expand Down Expand Up @@ -75,3 +77,30 @@ pub fn collect(it: Arc<Mutex<dyn BatchIterator>>) -> Result<Vec<RecordBatch>> {
}
}
}

/// Recursively build a list of files in a directory with a given extension
pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
build_file_list(path_name, filenames, ext)?;
} else {
if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
}
Ok(())
}
34 changes: 3 additions & 31 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

//! Execution plan for reading CSV files
use std::fs;
use std::fs::metadata;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::error::Result;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::Schema;
Expand Down Expand Up @@ -51,7 +50,7 @@ impl ExecutionPlan for CsvExec {
/// Get the partitions for this execution plan. Each partition can be executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let mut filenames: Vec<String> = vec![];
self.build_file_list(&self.path, &mut filenames)?;
common::build_file_list(&self.path, &mut filenames, ".csv")?;
let partitions = filenames
.iter()
.map(|filename| {
Expand Down Expand Up @@ -85,33 +84,6 @@ impl CsvExec {
batch_size,
})
}

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(".csv") {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
}
Ok(())
}
}

/// CSV Partition
Expand Down

0 comments on commit 95f7ec6

Please sign in to comment.