Skip to content

Commit

Permalink
ARROW-6088: [Rust] [DataFusion] Projection execution plan
Browse files Browse the repository at this point in the history
This PR implements the projection and CSV execution plans (I can split this into two PRs if necessary - one for CSV then one for projection).

Note that while I implement execution plans for each relational operator (projection, selection, aggregate, etc) there will be duplicate implementations because we already have the existing execution code that directly executes the logical plan. Once the new physical plan is in place, I will remove the original execution logic (and translate the logical plan to a physical plan).

Closes #4988 from andygrove/ARROW-6088 and squashes the following commits:

755365c <Andy Grove> Rebase and remove unwrap
fec84af <Andy Grove> test only delete temp path if exist
8f11c81 <Andy Grove> save
6db609f <Andy Grove> test passes
717dcd8 <Andy Grove> implement mutex for iterator
abf6d5e <Andy Grove> Save
a26575e <Andy Grove> rough out CSV execution plan
e806c76 <Andy Grove> formatting
768a7ae <Andy Grove> Implement Column expression
d1ede3c <Andy Grove> Implement projection logix
1875902 <Andy Grove> Roughing out projection execution plan

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent 42f4f34 commit d9b0ef1
Show file tree
Hide file tree
Showing 4 changed files with 508 additions and 5 deletions.
197 changes: 197 additions & 0 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Execution plan for reading CSV files
use std::fs;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;

/// Execution plan for scanning a CSV file
pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
/// Schema representing the CSV files after the optional projection is applied
schema: Arc<Schema>,
/// Does the CSV file have a header?
has_header: bool,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Batch size
batch_size: usize,
}

impl ExecutionPlan for CsvExec {
/// Get the schema for this execution plan
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}

/// Get the partitions for this execution plan. Each partition can be executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let mut filenames: Vec<String> = vec![];
self.build_file_list(&self.path, &mut filenames)?;
let partitions = filenames
.iter()
.map(|filename| {
Arc::new(CsvPartition::new(
&filename,
self.schema.clone(),
self.has_header,
self.projection.clone(),
self.batch_size,
)) as Arc<dyn Partition>
})
.collect();
Ok(partitions)
}
}

impl CsvExec {
/// Create a new execution plan for reading a set of CSV files
pub fn try_new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let projected_schema = match &projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => schema,
};

Ok(Self {
path: path.to_string(),
schema: projected_schema,
has_header,
projection,
batch_size,
})
}

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
Ok(())
}
}

/// CSV Partition
struct CsvPartition {
/// Path to the CSV File
path: String,
/// Schema representing the CSV file
schema: Arc<Schema>,
/// Does the CSV file have a header?
has_header: bool,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Batch size
batch_size: usize,
}

impl CsvPartition {
fn new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Self {
Self {
path: path.to_string(),
schema,
has_header,
projection,
batch_size,
}
}
}

impl Partition for CsvPartition {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(CsvIterator::try_new(
&self.path,
self.schema.clone(),
self.has_header,
&self.projection,
self.batch_size,
)?)))
}
}

/// Iterator over batches
struct CsvIterator {
/// Arrow CSV reader
reader: csv::Reader<File>,
}

impl CsvIterator {
/// Create an iterator for a CSV file
pub fn try_new(
filename: &str,
schema: Arc<Schema>,
has_header: bool,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let file = File::open(filename)?;
let reader = csv::Reader::new(
file,
schema.clone(),
has_header,
batch_size,
projection.clone(),
);

Ok(Self { reader })
}
}

impl BatchIterator for CsvIterator {
/// Get the next RecordBatch
fn next(&mut self) -> Result<Option<RecordBatch>> {
Ok(self.reader.next()?)
}
}
53 changes: 53 additions & 0 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
use crate::error::Result;
use crate::execution::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

/// Represents the column at a given index in a RecordBatch
pub struct Column {
index: usize,
}

impl Column {
/// Create a new column expression
pub fn new(index: usize) -> Self {
Self { index }
}
}

impl PhysicalExpr for Column {
/// Get the name to use in a schema to represent the result of this expression
fn name(&self) -> String {
format!("c{}", self.index)
}

/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
Ok(input_schema.field(self.index).data_type().clone())
}

/// Evaluate the expression
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
Ok(batch.column(self.index).clone())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

/// Partition-aware execution plan for a relation
pub trait ExecutionPlan {
Expand All @@ -34,11 +35,25 @@ pub trait ExecutionPlan {
/// Represents a partition of an execution plan that can be executed on a thread
pub trait Partition: Send + Sync {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<dyn BatchIterator>>;
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>>;
}

/// Iterator over RecordBatch that can be sent between threads
pub trait BatchIterator: Send + Sync {
/// Get the next RecordBatch
fn next(&self) -> Result<Option<RecordBatch>>;
fn next(&mut self) -> Result<Option<RecordBatch>>;
}

/// Expression that can be evaluated against a RecordBatch
pub trait PhysicalExpr: Send + Sync {
/// Get the name to use in a schema to represent the result of this expression
fn name(&self) -> String;
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
}

pub mod csv;
pub mod expressions;
pub mod projection;
Loading

0 comments on commit d9b0ef1

Please sign in to comment.