Skip to content

Commit

Permalink
[fix] replace enum with trait for extensibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 21, 2021
1 parent 442023b commit 0264171
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 232 deletions.
6 changes: 3 additions & 3 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -943,14 +943,14 @@ message GetFileMetadataParams {
FileType file_type = 2;
}

message ParquetConfig {
// fields of datasource::listing::FormatOptions::Parquet
message ParquetFormat {
// fields of datasource::format::parquet::ParquetFormat
}

message ListingConfig {
string extension = 1;
oneof format {
ParquetConfig parquet = 2;
ParquetFormat parquet = 2;
// csv, json, ...
}
}
Expand Down
66 changes: 66 additions & 0 deletions datafusion/src/datasource/format/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! CSV format abstractions
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;

use super::FileFormat;
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// Character Separated Value `FileFormat` implementation.
pub struct CsvFormat {
/// Set true to indicate that the first line is a header.
pub has_header: bool,
/// The character seprating values within a row.
pub delimiter: u8,
/// If no schema was provided for the table, it will be
/// infered from the data itself, this limits the number
/// of lines used in the process.
pub schema_infer_max_rec: Option<u64>,
}

#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, _path: &str) -> Result<SchemaRef> {
todo!()
}

async fn infer_stats(&self, _path: &str) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_executor(
&self,
_schema: SchemaRef,
_files: Vec<Vec<PartitionedFile>>,
_statistics: Statistics,
_projection: &Option<Vec<usize>>,
_batch_size: usize,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
todo!()
}
}
62 changes: 62 additions & 0 deletions datafusion/src/datasource/format/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Line delimited JSON format abstractions
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;

use super::FileFormat;
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// New line delimited JSON `FileFormat` implementation.
pub struct JsonFormat {
/// If no schema was provided for the table, it will be
/// infered from the data itself, this limits the number
/// of lines used in the process.
pub schema_infer_max_rec: Option<u64>,
}

#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, _path: &str) -> Result<SchemaRef> {
todo!()
}

async fn infer_stats(&self, _path: &str) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_executor(
&self,
_schema: SchemaRef,
_files: Vec<Vec<PartitionedFile>>,
_statistics: Statistics,
_projection: &Option<Vec<usize>>,
_batch_size: usize,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
todo!()
}
}
40 changes: 37 additions & 3 deletions datafusion/src/datasource/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,50 @@

//! Module containing helper methods for the various file formats
// pub mod csv;
// pub mod json;
pub mod csv;
pub mod json;
pub mod parquet;

use std::sync::Arc;

use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use crate::physical_plan::{Accumulator, ColumnStatistics, ExecutionPlan, Statistics};

use super::PartitionedFile;

use async_trait::async_trait;

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization accross
/// providers that support the the same file formats.
#[async_trait]
pub trait FileFormat: Send + Sync {
/// Open the file at the given path and infer its schema
async fn infer_schema(&self, path: &str) -> Result<SchemaRef>;

/// Open the file at the given path and infer its statistics
async fn infer_stats(&self, path: &str) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
/// TODO group params into TableDescription(schema,files,stats) and
/// ScanOptions(projection,batch_size,filters) to avoid too_many_arguments
#[allow(clippy::too_many_arguments)]
async fn create_executor(
&self,
schema: SchemaRef,
files: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
projection: &Option<Vec<usize>>,
batch_size: usize,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
}

/// Get all files as well as the summary statistic
/// if the optional `limit` is provided, includes only sufficient files
/// needed to read up to `limit` number of rows
Expand Down
63 changes: 61 additions & 2 deletions datafusion/src/datasource/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@
// specific language governing permissions and limitations
// under the License.

//! Parquet format helper methods
//! Parquet format abstractions
use std::fs::File;
use std::sync::Arc;

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics as ParquetStatistics;

use super::FileFormat;
use super::{create_max_min_accs, get_col_stats};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{Accumulator, Statistics};
use crate::scalar::ScalarValue;

Expand Down Expand Up @@ -156,7 +164,7 @@ fn summarize_min_max(
}

/// Read and parse the metadata of the Parquet file at location `path`
pub fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> {
fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> {
let file = File::open(path)?;
let file_reader = Arc::new(SerializedFileReader::new(file)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
Expand Down Expand Up @@ -214,6 +222,57 @@ pub fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> {
Ok((schema, statistics))
}

/// The Apache Parquet `FileFormat` implementation
pub struct ParquetFormat {
/// Activate statistics based row group level pruning
pub enable_pruning: bool,
}

#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, path: &str) -> Result<SchemaRef> {
let (schema, _) = fetch_metadata(path)?;
Ok(Arc::new(schema))
}

async fn infer_stats(&self, path: &str) -> Result<Statistics> {
let (_, stats) = fetch_metadata(path)?;
Ok(stats)
}

async fn create_executor(
&self,
schema: SchemaRef,
files: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
projection: &Option<Vec<usize>>,
batch_size: usize,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning {
combine_filters(filters)
} else {
None
};

Ok(Arc::new(ParquetExec::try_new_refacto(
files,
statistics,
schema,
projection.clone(),
predicate,
limit
.map(|l| std::cmp::min(l, batch_size))
.unwrap_or(batch_size),
limit,
)?))
}
}

// #[cfg(test)]
// mod tests {
// use super::*;
Expand Down
Loading

0 comments on commit 0264171

Please sign in to comment.