From 795dda566a520b88db32b24a382f58237320a7a5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Aug 2019 13:29:03 -0600 Subject: [PATCH] rebase on ARROW-6287 --- rust/datafusion/src/execution/context.rs | 56 ++++++++++++++ .../src/execution/physical_plan/datasource.rs | 73 +++++++++++++++++++ .../src/execution/physical_plan/mod.rs | 1 + 3 files changed, 130 insertions(+) create mode 100644 rust/datafusion/src/execution/physical_plan/datasource.rs diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 1e6f28e8dcc3c..dfe8200a5f546 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -34,6 +34,9 @@ use crate::execution::aggregate::AggregateRelation; use crate::execution::expression::*; use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; +use crate::execution::physical_plan::datasource::DatasourceExec; +use crate::execution::physical_plan::projection::ProjectionExec; +use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr}; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; use crate::execution::scalar_relation::ScalarRelation; @@ -202,6 +205,59 @@ impl ExecutionContext { Ok(plan) } + /// Create a physical plan from a logical plan + pub fn create_physical_plan( + &mut self, + logical_plan: &Arc, + ) -> Result> { + match logical_plan.as_ref() { + LogicalPlan::TableScan { + table_name, + projection, + .. + } => match (*self.datasources).borrow().get(table_name) { + Some(provider) => { + let partitions = provider.scan(projection, 16 * 1024)?; + if partitions.is_empty() { + Err(ExecutionError::General( + "Table provider returned no partitions".to_string(), + )) + } else { + let partition = partitions[0].lock().unwrap(); + let schema = partition.schema(); + let exec = + DatasourceExec::new(schema.clone(), partitions.clone()); + Ok(Arc::new(exec)) + } + } + _ => panic!(), + }, + LogicalPlan::Projection { input, expr, .. } => { + let input = self.create_physical_plan(input)?; + let input_schema = input.as_ref().schema().clone(); + //let me = self; + let runtime_expr = expr + .iter() + .map(|e| self.create_physical_expr(e, &input_schema)) + .collect::>>()?; + Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?)) + } + _ => Err(ExecutionError::General( + "Unsupported logical plan variant".to_string(), + )), + } + } + + /// Create a physical expression from a logical expression + pub fn create_physical_expr( + &self, + _e: &Expr, + _input_schema: &Schema, + ) -> Result> { + //TODO: implement this next + unimplemented!() + } + /// Execute a logical plan and produce a Relation (a schema-aware iterator over a /// series of RecordBatch instances) pub fn execute( diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs new file mode 100644 index 0000000000000..a34e96ed64519 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/datasource.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ExecutionPlan implementation for DataFusion data sources + +use std::sync::{Arc, Mutex}; + +use crate::error::Result; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; +use arrow::datatypes::Schema; + +/// Datasource execution plan +pub struct DatasourceExec { + schema: Arc, + partitions: Vec>>, +} + +impl DatasourceExec { + /// Create a new data source execution plan + pub fn new( + schema: Arc, + partitions: Vec>>, + ) -> Self { + Self { schema, partitions } + } +} + +impl ExecutionPlan for DatasourceExec { + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn partitions(&self) -> Result>> { + Ok(self + .partitions + .iter() + .map(|it| { + Arc::new(DatasourcePartition::new(it.clone())) as Arc + }) + .collect::>()) + } +} + +/// Wrapper to convert a BatchIterator into a Partition +pub struct DatasourcePartition { + batch_iter: Arc>, +} + +impl DatasourcePartition { + fn new(batch_iter: Arc>) -> Self { + Self { batch_iter } + } +} + +impl Partition for DatasourcePartition { + fn execute(&self) -> Result>> { + Ok(self.batch_iter.clone()) + } +} diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index a281c95a6232f..31d5142dab187 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -57,5 +57,6 @@ pub trait PhysicalExpr: Send + Sync { } pub mod csv; +pub mod datasource; pub mod expressions; pub mod projection;