diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 7db5909d013ef..8fff9b3d09674 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -54,7 +54,7 @@ crossbeam = "0.7.1" [dev-dependencies] criterion = "0.2.0" - +tempdir = "0.3.7" [[bench]] name = "aggregate_query_sql" diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index c8f128d4ffbed..6a8b200f1644e 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -18,16 +18,17 @@ //! CSV Data source use std::fs::File; -use std::string::String; -use std::sync::{Arc, Mutex}; use arrow::csv; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; +use std::string::String; +use std::sync::Arc; use crate::datasource::{ScanResult, TableProvider}; use crate::error::Result; -use crate::execution::physical_plan::BatchIterator; +use crate::execution::physical_plan::csv::CsvExec; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan}; /// Represents a CSV file with a provided schema // TODO: usage example (rather than documenting `new()`) @@ -58,13 +59,19 @@ impl TableProvider for CsvFile { projection: &Option>, batch_size: usize, ) -> Result> { - Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new( + let exec = CsvExec::try_new( &self.filename, self.schema.clone(), self.has_header, - projection, + projection.clone(), batch_size, - )))]) + )?; + let partitions = exec.partitions()?; + let iterators = partitions + .iter() + .map(|p| p.execute()) + .collect::>>()?; + Ok(iterators) } } diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index dbc91bbfa9391..52b380964f15e 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -15,18 +15,20 @@ // specific language governing permissions and limitations // under the License. -//! ExecutionContext contains methods for registering data sources and executing SQL -//! queries +//! ExecutionContext contains methods for registering data sources and executing queries use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::string::String; use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; use arrow::datatypes::*; use crate::arrow::array::{ArrayRef, BooleanBuilder}; +use crate::arrow::record_batch::RecordBatch; use crate::datasource::csv::CsvFile; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; @@ -35,6 +37,10 @@ use crate::execution::aggregate::AggregateRelation; use crate::execution::expression::*; use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; +use crate::execution::physical_plan::datasource::DatasourceExec; +use crate::execution::physical_plan::expressions::Column; +use crate::execution::physical_plan::projection::ProjectionExec; +use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr}; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; use crate::execution::scalar_relation::ScalarRelation; @@ -210,6 +216,112 @@ impl ExecutionContext { Ok(plan) } + /// Create a physical plan from a logical plan + pub fn create_physical_plan( + &mut self, + logical_plan: &Arc, + batch_size: usize, + ) -> Result> { + match logical_plan.as_ref() { + LogicalPlan::TableScan { + table_name, + projection, + .. + } => match (*self.datasources).borrow().get(table_name) { + Some(provider) => { + let partitions = provider.scan(projection, batch_size)?; + if partitions.is_empty() { + Err(ExecutionError::General( + "Table provider returned no partitions".to_string(), + )) + } else { + let partition = partitions[0].lock().unwrap(); + let schema = partition.schema(); + let exec = + DatasourceExec::new(schema.clone(), partitions.clone()); + Ok(Arc::new(exec)) + } + } + _ => Err(ExecutionError::General(format!( + "No table named {}", + table_name + ))), + }, + LogicalPlan::Projection { input, expr, .. } => { + let input = self.create_physical_plan(input, batch_size)?; + let input_schema = input.as_ref().schema().clone(); + let runtime_expr = expr + .iter() + .map(|e| self.create_physical_expr(e, &input_schema)) + .collect::>>()?; + Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?)) + } + _ => Err(ExecutionError::General( + "Unsupported logical plan variant".to_string(), + )), + } + } + + /// Create a physical expression from a logical expression + pub fn create_physical_expr( + &self, + e: &Expr, + _input_schema: &Schema, + ) -> Result> { + match e { + Expr::Column(i) => Ok(Arc::new(Column::new(*i))), + _ => Err(ExecutionError::NotImplemented( + "Unsupported expression".to_string(), + )), + } + } + + /// Execute a physical plan and collect the results in memory + pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result> { + let threads: Vec>>> = plan + .partitions()? + .iter() + .map(|p| { + let p = p.clone(); + thread::spawn(move || { + let it = p.execute()?; + let mut it = it.lock().unwrap(); + let mut results: Vec = vec![]; + loop { + match it.next() { + Ok(Some(batch)) => { + results.push(batch); + } + Ok(None) => { + // end of result set + return Ok(results); + } + Err(e) => return Err(e), + } + } + }) + }) + .collect(); + + // combine the results from each thread + let mut combined_results: Vec = vec![]; + for thread in threads { + match thread.join() { + Ok(result) => { + let result = result?; + result + .iter() + .for_each(|batch| combined_results.push(batch.clone())); + } + Err(_) => { + return Err(ExecutionError::General("Thread failed".to_string())) + } + } + } + + Ok(combined_results) + } + /// Execute a logical plan and produce a Relation (a schema-aware iterator over a /// series of RecordBatch instances) pub fn execute( @@ -402,3 +514,60 @@ impl SchemaProvider for ExecutionContextSchemaProvider { None } } + +#[cfg(test)] +mod tests { + + use super::*; + use std::fs::File; + use std::io::prelude::*; + use tempdir::TempDir; + + #[test] + fn parallel_projection() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt32, false), + ])); + + let tmp_dir = TempDir::new("parallel_projection")?; + + // generate a partitioned file + let partition_count = 4; + for partition in 0..partition_count { + let filename = format!("partition-{}.csv", partition); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{}\n", partition, i); + file.write_all(data.as_bytes())?; + } + } + + // register csv file with the execution context + ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true); + + let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?; + + let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?; + + let results = ctx.collect(physical_plan.as_ref())?; + + // there should be one batch per partition + assert_eq!(partition_count, results.len()); + + // each batch should contain 2 columns and 10 rows + for batch in &results { + assert_eq!(2, batch.num_columns()); + assert_eq!(10, batch.num_rows()); + } + + Ok(()) + } + +} diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index 82285b0f7da05..306718fe5c46e 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -18,13 +18,14 @@ //! Execution plan for reading CSV files use std::fs; +use std::fs::metadata; use std::fs::File; use std::sync::{Arc, Mutex}; use crate::error::{ExecutionError, Result}; use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; use arrow::csv; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; /// Execution plan for scanning a CSV file @@ -76,19 +77,9 @@ impl CsvExec { projection: Option>, batch_size: usize, ) -> Result { - let projected_schema = match &projection { - Some(p) => { - let projected_fields: Vec = - p.iter().map(|i| schema.fields()[*i].clone()).collect(); - - Arc::new(Schema::new(projected_fields)) - } - None => schema, - }; - Ok(Self { path: path.to_string(), - schema: projected_schema, + schema, has_header, projection, batch_size, @@ -97,19 +88,26 @@ impl CsvExec { /// Recursively build a list of csv files in a directory fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - self.build_file_list(path_name, filenames)?; - } else { - if path_name.ends_with(".csv") { - filenames.push(path_name.to_string()); + let metadata = metadata(dir)?; + if metadata.is_file() { + if dir.ends_with(".csv") { + filenames.push(dir.to_string()); + } + } else { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + self.build_file_list(path_name, filenames)?; + } else { + if path_name.ends_with(".csv") { + filenames.push(path_name.to_string()); + } } + } else { + return Err(ExecutionError::General("Invalid path".to_string())); } - } else { - return Err(ExecutionError::General("Invalid path".to_string())); } } Ok(()) diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs new file mode 100644 index 0000000000000..a34e96ed64519 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/datasource.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ExecutionPlan implementation for DataFusion data sources + +use std::sync::{Arc, Mutex}; + +use crate::error::Result; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; +use arrow::datatypes::Schema; + +/// Datasource execution plan +pub struct DatasourceExec { + schema: Arc, + partitions: Vec>>, +} + +impl DatasourceExec { + /// Create a new data source execution plan + pub fn new( + schema: Arc, + partitions: Vec>>, + ) -> Self { + Self { schema, partitions } + } +} + +impl ExecutionPlan for DatasourceExec { + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn partitions(&self) -> Result>> { + Ok(self + .partitions + .iter() + .map(|it| { + Arc::new(DatasourcePartition::new(it.clone())) as Arc + }) + .collect::>()) + } +} + +/// Wrapper to convert a BatchIterator into a Partition +pub struct DatasourcePartition { + batch_iter: Arc>, +} + +impl DatasourcePartition { + fn new(batch_iter: Arc>) -> Self { + Self { batch_iter } + } +} + +impl Partition for DatasourcePartition { + fn execute(&self) -> Result>> { + Ok(self.batch_iter.clone()) + } +} diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 7006ca754afbc..8a83beb58ccd3 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -57,5 +57,6 @@ pub trait PhysicalExpr: Send + Sync { } pub mod csv; +pub mod datasource; pub mod expressions; pub mod projection;