Skip to content

Commit

Permalink
rough out CSV execution plan
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent e806c76 commit a26575e
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 3 deletions.
80 changes: 80 additions & 0 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Execution plan for reading CSV files
use crate::error::Result;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
/// Schema representing the CSV files
schema: Arc<Schema>,
}

impl ExecutionPlan for CsvExec {
/// Get the schema for this execution plan
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}

/// Get the partitions for this execution plan. Each partition can be executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<Partition>>> {
// TODO get list of files in the directory and create a partition for each one
unimplemented!()
}
}

impl CsvExec {
/// Create a new execution plan for reading a set of CSV files
pub fn try_new(path: &str, schema: Arc<Schema>) -> Result<Self> {
//TODO
Ok(Self {
path: path.to_string(),
schema: schema.clone(),
})
}
}

/// CSV Partition
struct CsvPartition {
/// Path to the CSV File
path: String,
/// Schema representing the CSV file
schema: Arc<Schema>,
}

impl Partition for CsvPartition {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<dyn BatchIterator>> {
unimplemented!()
}
}

/// Iterator over batches
struct CsvIterator {}

impl BatchIterator for CsvIterator {
/// Get the next RecordBatch
fn next(&self) -> Result<Option<RecordBatch>> {
unimplemented!()
}
}
11 changes: 11 additions & 0 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::Result;
use crate::execution::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

/// Represents the column at a given index in a RecordBatch
Expand All @@ -28,6 +29,16 @@ pub struct Column {
}

impl PhysicalExpr for Column {
/// Get the name to use in a schema to represent the result of this expression
fn name(&self) -> String {
format!("c{}", self.index)
}

/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
Ok(input_schema.field(self.index).data_type().clone())
}

/// Evaluate the expression
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
Ok(batch.column(self.index).clone())
Expand Down
9 changes: 7 additions & 2 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

use crate::error::Result;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

/// Partition-aware execution plan for a relation
pub trait ExecutionPlan {
Expand All @@ -46,9 +46,14 @@ pub trait BatchIterator: Send + Sync {

/// Expression that can be evaluated against a RecordBatch
pub trait PhysicalExpr: Send + Sync {
/// Get the name to use in a schema to represent the result of this expression
fn name(&self) -> String;
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
}

pub mod csv;
pub mod expressions;
pub mod projection;
64 changes: 63 additions & 1 deletion rust/datafusion/src/execution/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::Result;
use crate::execution::physical_plan::{
BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;

/// Execution plan for a projection
Expand All @@ -39,6 +39,29 @@ pub struct ProjectionExec {
input: Arc<dyn ExecutionPlan>,
}

impl ProjectionExec {
/// Create a projection on an input
fn try_new(
expr: Vec<Arc<dyn PhysicalExpr>>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();

let fields: Result<Vec<_>> = expr
.iter()
.map(|e| Ok(Field::new(&e.name(), e.data_type(&input_schema)?, true)))
.collect();

let schema = Arc::new(Schema::new(fields?));

Ok(Self {
expr: expr.clone(),
schema,
input: input.clone(),
})
}
}

impl ExecutionPlan for ProjectionExec {
/// Get the schema for this execution plan
fn schema(&self) -> Arc<Schema> {
Expand Down Expand Up @@ -105,3 +128,42 @@ impl BatchIterator for ProjectionIterator {
}
}
}

#[cfg(test)]
mod tests {

use super::*;
use crate::execution::physical_plan::csv::CsvExec;
use arrow::datatypes::{DataType, Field, Schema};
use std::env;

#[test]
fn project_first_column() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c3", DataType::Int16, false),
Field::new("c4", DataType::Int32, false),
Field::new("c5", DataType::Int64, false),
Field::new("c6", DataType::UInt8, false),
Field::new("c7", DataType::UInt16, false),
Field::new("c8", DataType::UInt32, false),
Field::new("c9", DataType::UInt64, false),
Field::new("c10", DataType::Float32, false),
Field::new("c11", DataType::Float64, false),
Field::new("c12", DataType::Utf8, false),
]));

let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");

let path = format!("{}/tbd", testdata);

//TODO: working on this now ..

let projection =
ProjectionExec::try_new(vec![], Arc::new(CsvExec::try_new(&path, schema)?))?;

Ok(())
}
}

0 comments on commit a26575e

Please sign in to comment.