From c69221785efea3bc969b52990280cd6c07627a3b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2019 08:21:53 -0600 Subject: [PATCH 01/13] rebase --- rust/datafusion/src/execution/context.rs | 67 +++++++++++++++++ .../src/execution/physical_plan/datasource.rs | 73 +++++++++++++++++++ .../src/execution/physical_plan/mod.rs | 1 + 3 files changed, 141 insertions(+) create mode 100644 rust/datafusion/src/execution/physical_plan/datasource.rs diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index dbc91bbfa9391..bbd96632cc47b 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use arrow::datatypes::*; use crate::arrow::array::{ArrayRef, BooleanBuilder}; +use crate::arrow::record_batch::RecordBatch; use crate::datasource::csv::CsvFile; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; @@ -35,6 +36,9 @@ use crate::execution::aggregate::AggregateRelation; use crate::execution::expression::*; use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; +use crate::execution::physical_plan::datasource::DatasourceExec; +use crate::execution::physical_plan::projection::ProjectionExec; +use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr}; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; use crate::execution::scalar_relation::ScalarRelation; @@ -210,6 +214,69 @@ impl ExecutionContext { Ok(plan) } + /// Create a physical plan from a logical plan + pub fn create_physical_plan( + &mut self, + logical_plan: &Arc, + ) -> Result> { + match logical_plan.as_ref() { + LogicalPlan::TableScan { + table_name, + projection, + .. + } => match (*self.datasources).borrow().get(table_name) { + Some(provider) => { + let partitions = provider.scan(projection, 16 * 1024)?; + if partitions.is_empty() { + Err(ExecutionError::General( + "Table provider returned no partitions".to_string(), + )) + } else { + let partition = partitions[0].lock().unwrap(); + let schema = partition.schema(); + let exec = + DatasourceExec::new(schema.clone(), partitions.clone()); + Ok(Arc::new(exec)) + } + } + _ => panic!(), + }, + LogicalPlan::Projection { input, expr, .. } => { + let input = self.create_physical_plan(input)?; + let input_schema = input.as_ref().schema().clone(); + //let me = self; + let runtime_expr = expr + .iter() + .map(|e| self.create_physical_expr(e, &input_schema)) + .collect::>>()?; + Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?)) + } + _ => Err(ExecutionError::General( + "Unsupported logical plan variant".to_string(), + )), + } + } + + /// Create a physical expression from a logical expression + pub fn create_physical_expr( + &self, + _e: &Expr, + _input_schema: &Schema, + ) -> Result> { + //TODO: implement this next + unimplemented!() + } + + /// Execute a physical plan and collect the results in memory + pub fn collect(&self, _plan: &dyn ExecutionPlan) -> Result> { + unimplemented!() + } + + /// Execute a physical plan and write the results in CSV format + pub fn write_csv(&self, _plan: &dyn ExecutionPlan, _path: &str) -> Result<()> { + unimplemented!() + } + /// Execute a logical plan and produce a Relation (a schema-aware iterator over a /// series of RecordBatch instances) pub fn execute( diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs new file mode 100644 index 0000000000000..a34e96ed64519 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/datasource.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ExecutionPlan implementation for DataFusion data sources + +use std::sync::{Arc, Mutex}; + +use crate::error::Result; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; +use arrow::datatypes::Schema; + +/// Datasource execution plan +pub struct DatasourceExec { + schema: Arc, + partitions: Vec>>, +} + +impl DatasourceExec { + /// Create a new data source execution plan + pub fn new( + schema: Arc, + partitions: Vec>>, + ) -> Self { + Self { schema, partitions } + } +} + +impl ExecutionPlan for DatasourceExec { + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn partitions(&self) -> Result>> { + Ok(self + .partitions + .iter() + .map(|it| { + Arc::new(DatasourcePartition::new(it.clone())) as Arc + }) + .collect::>()) + } +} + +/// Wrapper to convert a BatchIterator into a Partition +pub struct DatasourcePartition { + batch_iter: Arc>, +} + +impl DatasourcePartition { + fn new(batch_iter: Arc>) -> Self { + Self { batch_iter } + } +} + +impl Partition for DatasourcePartition { + fn execute(&self) -> Result>> { + Ok(self.batch_iter.clone()) + } +} diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 7006ca754afbc..8a83beb58ccd3 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -57,5 +57,6 @@ pub trait PhysicalExpr: Send + Sync { } pub mod csv; +pub mod datasource; pub mod expressions; pub mod projection; From 42566c364c5a4939987efdeddc377bebdac37809 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 09:41:23 -0600 Subject: [PATCH 02/13] implement collect() --- rust/datafusion/src/execution/context.rs | 39 +++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index bbd96632cc47b..763d2564cf518 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! ExecutionContext contains methods for registering data sources and executing SQL -//! queries +//! ExecutionContext contains methods for registering data sources and executing queries use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::string::String; use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; use arrow::datatypes::*; @@ -268,13 +269,35 @@ impl ExecutionContext { } /// Execute a physical plan and collect the results in memory - pub fn collect(&self, _plan: &dyn ExecutionPlan) -> Result> { - unimplemented!() - } + pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result> { + let threads: Vec>>> = plan + .partitions()? + .iter() + .map(|p| { + let p = p.clone(); + thread::spawn(move || { + let it = p.execute().unwrap(); + let mut it = it.lock().unwrap(); + let mut results: Vec = vec![]; + while let Ok(Some(batch)) = it.next() { + results.push(batch); + } + Ok(results) + }) + }) + .collect(); + + // combine the results from each thread + let mut combined_results: Vec = vec![]; + for thread in threads { + let result = thread.join().unwrap(); + let result = result.unwrap(); + result + .iter() + .for_each(|batch| combined_results.push(batch.clone())); + } - /// Execute a physical plan and write the results in CSV format - pub fn write_csv(&self, _plan: &dyn ExecutionPlan, _path: &str) -> Result<()> { - unimplemented!() + Ok(combined_results) } /// Execute a logical plan and produce a Relation (a schema-aware iterator over a From 63c88d8050307e00a707e97c692570aed7de9b10 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 09:44:13 -0600 Subject: [PATCH 03/13] error handling --- rust/datafusion/src/execution/context.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 763d2564cf518..69ba27582dad3 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -279,10 +279,18 @@ impl ExecutionContext { let it = p.execute().unwrap(); let mut it = it.lock().unwrap(); let mut results: Vec = vec![]; - while let Ok(Some(batch)) = it.next() { - results.push(batch); + loop { + match it.next() { + Ok(Some(batch)) => { + results.push(batch); + } + Ok(None) => { + // end of result set + return Ok(results); + } + Err(e) => return Err(e), + } } - Ok(results) }) }) .collect(); From a292cce37c37478a62e2d8947a28a7fbc15db7cf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 09:51:10 -0600 Subject: [PATCH 04/13] unit test (currently fails) --- rust/datafusion/src/execution/context.rs | 51 ++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 69ba27582dad3..e69adf963e24f 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -500,3 +500,54 @@ impl SchemaProvider for ExecutionContextSchemaProvider { None } } + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn parallel_projection() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c4", DataType::Int16, false), + Field::new("c5", DataType::Int32, false), + Field::new("c6", DataType::Int64, false), + Field::new("c7", DataType::UInt8, false), + Field::new("c8", DataType::UInt16, false), + Field::new("c9", DataType::UInt32, false), + Field::new("c10", DataType::UInt64, false), + Field::new("c11", DataType::Float32, false), + Field::new("c12", DataType::Float64, false), + Field::new("c13", DataType::Utf8, false), + ])); + + let testdata = + ::std::env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + + // register csv file with the execution context + ctx.register_csv( + "aggregate_test_100", + &format!("{}/csv/aggregate_test_100.csv", testdata), + &schema, + true, + ); + + let logical_plan = + ctx.create_logical_plan("SELECT c1, c13 FROM aggregate_test_100")?; + + let physical_plan = ctx.create_physical_plan(&logical_plan)?; + + let results = ctx.collect(physical_plan.as_ref())?; + + assert_eq!(123, results.len()); + + Ok(()) + } + +} From 55150335c6f5e2ab2e2584fd681ba424ead5ee1e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 10:05:02 -0600 Subject: [PATCH 05/13] unit test works --- rust/datafusion/src/execution/context.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index e69adf963e24f..56d6ce5c27402 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -39,6 +39,7 @@ use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; use crate::execution::physical_plan::datasource::DatasourceExec; use crate::execution::physical_plan::projection::ProjectionExec; +use crate::execution::physical_plan::expressions::Column; use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr}; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; @@ -261,11 +262,14 @@ impl ExecutionContext { /// Create a physical expression from a logical expression pub fn create_physical_expr( &self, - _e: &Expr, + e: &Expr, _input_schema: &Schema, ) -> Result> { - //TODO: implement this next - unimplemented!() + + match e { + Expr::Column(i) => Ok(Arc::new(Column::new(*i))), + _ => Err(ExecutionError::NotImplemented("Unsupported expression".to_string())) + } } /// Execute a physical plan and collect the results in memory @@ -545,7 +549,8 @@ mod tests { let results = ctx.collect(physical_plan.as_ref())?; - assert_eq!(123, results.len()); + assert_eq!(1, results.len()); + assert_eq!(2, results[0].num_columns()); Ok(()) } From 7ff8ab71858839c265b0db73a5b63ed6e4e4b7b9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 10:05:11 -0600 Subject: [PATCH 06/13] gmt --- rust/datafusion/src/execution/context.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 56d6ce5c27402..f77f4fd33056a 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -38,8 +38,8 @@ use crate::execution::expression::*; use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; use crate::execution::physical_plan::datasource::DatasourceExec; -use crate::execution::physical_plan::projection::ProjectionExec; use crate::execution::physical_plan::expressions::Column; +use crate::execution::physical_plan::projection::ProjectionExec; use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr}; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; @@ -265,10 +265,11 @@ impl ExecutionContext { e: &Expr, _input_schema: &Schema, ) -> Result> { - match e { Expr::Column(i) => Ok(Arc::new(Column::new(*i))), - _ => Err(ExecutionError::NotImplemented("Unsupported expression".to_string())) + _ => Err(ExecutionError::NotImplemented( + "Unsupported expression".to_string(), + )), } } From 20216bc6107911f18b6c393a1092240b5c036e34 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 12:14:18 -0600 Subject: [PATCH 07/13] realistic test (failing) --- rust/datafusion/Cargo.toml | 2 +- rust/datafusion/src/execution/context.rs | 53 +++++++++++++----------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 7db5909d013ef..8fff9b3d09674 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -54,7 +54,7 @@ crossbeam = "0.7.1" [dev-dependencies] criterion = "0.2.0" - +tempdir = "0.3.7" [[bench]] name = "aggregate_query_sql" diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index f77f4fd33056a..fc1ec1a4057b8 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -510,6 +510,9 @@ impl SchemaProvider for ExecutionContextSchemaProvider { mod tests { use super::*; + use std::fs::File; + use std::io::prelude::*; + use tempdir::TempDir; #[test] fn parallel_projection() -> Result<()> { @@ -517,41 +520,43 @@ mod tests { // define schema for data source (csv file) let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), + Field::new("c1", DataType::UInt32, false), Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), ])); - let testdata = - ::std::env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let tmp_dir = TempDir::new("parallel_projection")?; + + // generate a partitioned file + let partition_count = 4; + for partition in 0..partition_count { + let filename = format!("partition-{}.csv", partition); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..10 { + let data = format!("{},{}\n", partition, i); + file.write_all(data.as_bytes())?; + } + } // register csv file with the execution context - ctx.register_csv( - "aggregate_test_100", - &format!("{}/csv/aggregate_test_100.csv", testdata), - &schema, - true, - ); + ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true); - let logical_plan = - ctx.create_logical_plan("SELECT c1, c13 FROM aggregate_test_100")?; + let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; let results = ctx.collect(physical_plan.as_ref())?; - assert_eq!(1, results.len()); - assert_eq!(2, results[0].num_columns()); + // there should be one batch per partition + assert_eq!(partition_count, results.len()); + + // each batch should contain 2 columns and 10 rows + for batch in &results { + assert_eq!(2, batch.num_columns()); + assert_eq!(10, batch.num_rows()); + } Ok(()) } From 104f3ad9b04ac1c5e2da93274fe41dd594864747 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 13:04:14 -0600 Subject: [PATCH 08/13] csv provider now supports partitioned csv files --- rust/datafusion/src/datasource/csv.rs | 19 ++++++++---- rust/datafusion/src/execution/context.rs | 4 +-- .../src/execution/physical_plan/csv.rs | 30 ++++++++++++------- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index c8f128d4ffbed..4690b23610b28 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -18,16 +18,17 @@ //! CSV Data source use std::fs::File; -use std::string::String; -use std::sync::{Arc, Mutex}; +use std::string::String; +use std::sync::Arc; use arrow::csv; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use crate::datasource::{ScanResult, TableProvider}; use crate::error::Result; -use crate::execution::physical_plan::BatchIterator; +use crate::execution::physical_plan::csv::CsvExec; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan}; /// Represents a CSV file with a provided schema // TODO: usage example (rather than documenting `new()`) @@ -58,13 +59,19 @@ impl TableProvider for CsvFile { projection: &Option>, batch_size: usize, ) -> Result> { - Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new( + let exec = CsvExec::try_new( &self.filename, self.schema.clone(), self.has_header, - projection, + projection.clone(), batch_size, - )))]) + )?; + let partitions = exec.partitions()?; + let iterators = partitions + .iter() + .map(|p| p.execute()) + .collect::>>()?; + Ok(iterators) } } diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index fc1ec1a4057b8..db3e4ff91e3d7 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -304,7 +304,7 @@ impl ExecutionContext { let mut combined_results: Vec = vec![]; for thread in threads { let result = thread.join().unwrap(); - let result = result.unwrap(); + let result = result?; result .iter() .for_each(|batch| combined_results.push(batch.clone())); @@ -534,7 +534,7 @@ mod tests { let mut file = File::create(file_path)?; // generate some data - for i in 0..10 { + for i in 0..=10 { let data = format!("{},{}\n", partition, i); file.write_all(data.as_bytes())?; } diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index 82285b0f7da05..eeb7b7462030a 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -19,6 +19,7 @@ use std::fs; use std::fs::File; +use std::fs::metadata; use std::sync::{Arc, Mutex}; use crate::error::{ExecutionError, Result}; @@ -97,19 +98,26 @@ impl CsvExec { /// Recursively build a list of csv files in a directory fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - if let Some(path_name) = path.to_str() { - if path.is_dir() { - self.build_file_list(path_name, filenames)?; - } else { - if path_name.ends_with(".csv") { - filenames.push(path_name.to_string()); + let metadata = metadata(dir)?; + if metadata.is_file() { + if dir.ends_with(".csv") { + filenames.push(dir.to_string()); + } + } else { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + self.build_file_list(path_name, filenames)?; + } else { + if path_name.ends_with(".csv") { + filenames.push(path_name.to_string()); + } } + } else { + return Err(ExecutionError::General("Invalid path".to_string())); } - } else { - return Err(ExecutionError::General("Invalid path".to_string())); } } Ok(()) From dd3cbe6eb62ab8af188140836d715e3675ce0796 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 13:05:00 -0600 Subject: [PATCH 09/13] format --- rust/datafusion/src/datasource/csv.rs | 4 ++-- rust/datafusion/src/execution/physical_plan/csv.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index 4690b23610b28..6a8b200f1644e 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -19,11 +19,11 @@ use std::fs::File; -use std::string::String; -use std::sync::Arc; use arrow::csv; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; +use std::string::String; +use std::sync::Arc; use crate::datasource::{ScanResult, TableProvider}; use crate::error::Result; diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index eeb7b7462030a..c10dd3a1c5707 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -18,8 +18,8 @@ //! Execution plan for reading CSV files use std::fs; -use std::fs::File; use std::fs::metadata; +use std::fs::File; use std::sync::{Arc, Mutex}; use crate::error::{ExecutionError, Result}; From 21e646f50bff3512ae84995b0fb193a7fd57fbe4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 15:24:31 -0600 Subject: [PATCH 10/13] test passes --- rust/datafusion/src/execution/physical_plan/csv.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index c10dd3a1c5707..306718fe5c46e 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex}; use crate::error::{ExecutionError, Result}; use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; use arrow::csv; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; /// Execution plan for scanning a CSV file @@ -77,19 +77,9 @@ impl CsvExec { projection: Option>, batch_size: usize, ) -> Result { - let projected_schema = match &projection { - Some(p) => { - let projected_fields: Vec = - p.iter().map(|i| schema.fields()[*i].clone()).collect(); - - Arc::new(Schema::new(projected_fields)) - } - None => schema, - }; - Ok(Self { path: path.to_string(), - schema: projected_schema, + schema, has_header, projection, batch_size, From d8ff02fd14ec36146c6bdc33091e21213aaacd2d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Sep 2019 07:37:10 -0600 Subject: [PATCH 11/13] remove hard-coded batch size, remove comment, remove panic --- rust/datafusion/src/execution/context.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index db3e4ff91e3d7..85ba5212b05d5 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -220,6 +220,7 @@ impl ExecutionContext { pub fn create_physical_plan( &mut self, logical_plan: &Arc, + batch_size: usize, ) -> Result> { match logical_plan.as_ref() { LogicalPlan::TableScan { @@ -228,7 +229,7 @@ impl ExecutionContext { .. } => match (*self.datasources).borrow().get(table_name) { Some(provider) => { - let partitions = provider.scan(projection, 16 * 1024)?; + let partitions = provider.scan(projection, batch_size)?; if partitions.is_empty() { Err(ExecutionError::General( "Table provider returned no partitions".to_string(), @@ -241,12 +242,14 @@ impl ExecutionContext { Ok(Arc::new(exec)) } } - _ => panic!(), + _ => Err(ExecutionError::General(format!( + "No table named {}", + table_name + ))), }, LogicalPlan::Projection { input, expr, .. } => { - let input = self.create_physical_plan(input)?; + let input = self.create_physical_plan(input, batch_size)?; let input_schema = input.as_ref().schema().clone(); - //let me = self; let runtime_expr = expr .iter() .map(|e| self.create_physical_expr(e, &input_schema)) @@ -545,7 +548,7 @@ mod tests { let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?; - let physical_plan = ctx.create_physical_plan(&logical_plan)?; + let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?; let results = ctx.collect(physical_plan.as_ref())?; From 0210c70f161e315de6627e2edab0797b49058383 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Sep 2019 07:43:35 -0600 Subject: [PATCH 12/13] remove an unwrap --- rust/datafusion/src/execution/context.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 85ba5212b05d5..52b380964f15e 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -284,7 +284,7 @@ impl ExecutionContext { .map(|p| { let p = p.clone(); thread::spawn(move || { - let it = p.execute().unwrap(); + let it = p.execute()?; let mut it = it.lock().unwrap(); let mut results: Vec = vec![]; loop { @@ -306,11 +306,17 @@ impl ExecutionContext { // combine the results from each thread let mut combined_results: Vec = vec![]; for thread in threads { - let result = thread.join().unwrap(); - let result = result?; - result - .iter() - .for_each(|batch| combined_results.push(batch.clone())); + match thread.join() { + Ok(result) => { + let result = result?; + result + .iter() + .for_each(|batch| combined_results.push(batch.clone())); + } + Err(_) => { + return Err(ExecutionError::General("Thread failed".to_string())) + } + } } Ok(combined_results) From b820e6904008e334b26c3e0e18d51efaea217526 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Sep 2019 17:32:11 -0600 Subject: [PATCH 13/13] trigger build