Skip to content

Commit

Permalink
Add support for partitioned parquet data sources
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 25, 2019
1 parent 25eaf45 commit c751753
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 23 deletions.
46 changes: 27 additions & 19 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,27 @@ use parquet::file::reader::*;

use crate::datasource::{ScanResult, TableProvider};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::BatchIterator;

/// Table-based representation of a `ParquetFile`
pub struct ParquetTable {
filename: String,
filenames: Vec<String>,
schema: Arc<Schema>,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path
pub fn try_new(filename: &str) -> Result<Self> {
let parquet_file = ParquetFile::open(filename, None, 0)?;
let schema = parquet_file.projection_schema.clone();
Ok(Self {
filename: filename.to_string(),
schema,
})
pub fn try_new(path: &str) -> Result<Self> {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
panic!()
} else {
let parquet_file = ParquetFile::open(&filenames[0], None, 0)?;
let schema = parquet_file.projection_schema.clone();
Ok(Self { filenames, schema })
}
}
}

Expand All @@ -70,17 +74,21 @@ impl TableProvider for ParquetTable {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
// note that this code currently assumes the filename is a file rather than a directory
// and therefore only returns a single partition
let parquet_file = match projection {
Some(p) => ParquetScanPartition::try_new(
&self.filename,
Some(p.clone()),
batch_size,
)?,
None => ParquetScanPartition::try_new(&self.filename, None, batch_size)?,
};
Ok(vec![Arc::new(Mutex::new(parquet_file))])
//TODO remove the unwrap
Ok(self
.filenames
.iter()
.map(|filename| {
Arc::new(Mutex::new(
ParquetScanPartition::try_new(
filename,
projection.clone(),
batch_size,
)
.unwrap(),
)) as Arc<Mutex<dyn BatchIterator>>
})
.collect())
}
}

Expand Down
1 change: 0 additions & 1 deletion rust/datafusion/src/execution/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use std::fs;
use std::fs::metadata;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
Expand Down
4 changes: 1 addition & 3 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

//! Execution plan for reading CSV files
use std::fs;
use std::fs::metadata;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::error::Result;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
Expand Down

0 comments on commit c751753

Please sign in to comment.