Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 25, 2019
1 parent 9fbbc73 commit c692217
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
67 changes: 67 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use arrow::datatypes::*;

use crate::arrow::array::{ArrayRef, BooleanBuilder};
use crate::arrow::record_batch::RecordBatch;
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
Expand All @@ -35,6 +36,9 @@ use crate::execution::aggregate::AggregateRelation;
use crate::execution::expression::*;
use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::projection::ProjectionExec;
use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr};
use crate::execution::projection::ProjectRelation;
use crate::execution::relation::{DataSourceRelation, Relation};
use crate::execution::scalar_relation::ScalarRelation;
Expand Down Expand Up @@ -210,6 +214,69 @@ impl ExecutionContext {
Ok(plan)
}

/// Create a physical plan from a logical plan
pub fn create_physical_plan(
&mut self,
logical_plan: &Arc<LogicalPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
match logical_plan.as_ref() {
LogicalPlan::TableScan {
table_name,
projection,
..
} => match (*self.datasources).borrow().get(table_name) {
Some(provider) => {
let partitions = provider.scan(projection, 16 * 1024)?;
if partitions.is_empty() {
Err(ExecutionError::General(
"Table provider returned no partitions".to_string(),
))
} else {
let partition = partitions[0].lock().unwrap();
let schema = partition.schema();
let exec =
DatasourceExec::new(schema.clone(), partitions.clone());
Ok(Arc::new(exec))
}
}
_ => panic!(),
},
LogicalPlan::Projection { input, expr, .. } => {
let input = self.create_physical_plan(input)?;
let input_schema = input.as_ref().schema().clone();
//let me = self;
let runtime_expr = expr
.iter()
.map(|e| self.create_physical_expr(e, &input_schema))
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
}
_ => Err(ExecutionError::General(
"Unsupported logical plan variant".to_string(),
)),
}
}

/// Create a physical expression from a logical expression
pub fn create_physical_expr(
&self,
_e: &Expr,
_input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
//TODO: implement this next
unimplemented!()
}

/// Execute a physical plan and collect the results in memory
pub fn collect(&self, _plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
unimplemented!()
}

/// Execute a physical plan and write the results in CSV format
pub fn write_csv(&self, _plan: &dyn ExecutionPlan, _path: &str) -> Result<()> {
unimplemented!()
}

/// Execute a logical plan and produce a Relation (a schema-aware iterator over a
/// series of RecordBatch instances)
pub fn execute(
Expand Down
73 changes: 73 additions & 0 deletions rust/datafusion/src/execution/physical_plan/datasource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! ExecutionPlan implementation for DataFusion data sources
use std::sync::{Arc, Mutex};

use crate::error::Result;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::datatypes::Schema;

/// Datasource execution plan
pub struct DatasourceExec {
schema: Arc<Schema>,
partitions: Vec<Arc<Mutex<dyn BatchIterator>>>,
}

impl DatasourceExec {
/// Create a new data source execution plan
pub fn new(
schema: Arc<Schema>,
partitions: Vec<Arc<Mutex<dyn BatchIterator>>>,
) -> Self {
Self { schema, partitions }
}
}

impl ExecutionPlan for DatasourceExec {
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}

fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
Ok(self
.partitions
.iter()
.map(|it| {
Arc::new(DatasourcePartition::new(it.clone())) as Arc<dyn Partition>
})
.collect::<Vec<_>>())
}
}

/// Wrapper to convert a BatchIterator into a Partition
pub struct DatasourcePartition {
batch_iter: Arc<Mutex<dyn BatchIterator>>,
}

impl DatasourcePartition {
fn new(batch_iter: Arc<Mutex<dyn BatchIterator>>) -> Self {
Self { batch_iter }
}
}

impl Partition for DatasourcePartition {
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(self.batch_iter.clone())
}
}
1 change: 1 addition & 0 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ pub trait PhysicalExpr: Send + Sync {
}

pub mod csv;
pub mod datasource;
pub mod expressions;
pub mod projection;

0 comments on commit c692217

Please sign in to comment.