From 717dcd87bfefb44922ccbdd3dafe70847389cae6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 1 Aug 2019 12:10:34 -0600 Subject: [PATCH] implement mutex for iterator --- .../src/execution/physical_plan/csv.rs | 16 +++++++------- .../src/execution/physical_plan/mod.rs | 6 +++--- .../src/execution/physical_plan/projection.rs | 21 ++++++++++++------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs index af70b99c887a4..fdb639b7f7b22 100644 --- a/rust/datafusion/src/execution/physical_plan/csv.rs +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -19,7 +19,7 @@ use std::fs; use std::fs::File; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::error::Result; use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; @@ -27,6 +27,7 @@ use arrow::csv; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; +/// Execution plan for scanning a CSV file pub struct CsvExec { /// Path to directory containing partitioned CSV files with the same schema path: String, @@ -101,14 +102,14 @@ impl CsvPartition { impl Partition for CsvPartition { /// Execute this partition and return an iterator over RecordBatch - fn execute(&self) -> Result> { - Ok(Arc::new(CsvIterator::try_new( + fn execute(&self) -> Result>> { + Ok(Arc::new(Mutex::new(CsvIterator::try_new( &self.path, self.schema.clone(), true, //TODO: do not hard-code &None, //TODO: do not hard-code - 1024, - )?)) //TODO: do not hard-code + 1024, //TODO: do not hard-code + )?))) } } @@ -157,8 +158,7 @@ impl CsvIterator { impl BatchIterator for CsvIterator { /// Get the next RecordBatch - fn next(&self) -> Result> { - //Ok(self.reader.next()?) - unimplemented!() + fn next(&mut self) -> Result> { + Ok(self.reader.next()?) } } diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 7d91b56ca4f63..7dae854b630d7 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -17,7 +17,7 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::error::Result; use arrow::array::ArrayRef; @@ -35,13 +35,13 @@ pub trait ExecutionPlan { /// Represents a partition of an execution plan that can be executed on a thread pub trait Partition: Send + Sync { /// Execute this partition and return an iterator over RecordBatch - fn execute(&self) -> Result>; + fn execute(&self) -> Result>>; } /// Iterator over RecordBatch that can be sent between threads pub trait BatchIterator: Send + Sync { /// Get the next RecordBatch - fn next(&self) -> Result>; + fn next(&mut self) -> Result>; } /// Expression that can be evaluated against a RecordBatch diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs index b43bd61527716..01c63b597e21c 100644 --- a/rust/datafusion/src/execution/physical_plan/projection.rs +++ b/rust/datafusion/src/execution/physical_plan/projection.rs @@ -20,7 +20,7 @@ //! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the //! projection expressions. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::error::Result; use crate::execution::physical_plan::{ @@ -99,12 +99,12 @@ struct ProjectionPartition { impl Partition for ProjectionPartition { /// Execute the projection - fn execute(&self) -> Result> { - Ok(Arc::new(ProjectionIterator { + fn execute(&self) -> Result>> { + Ok(Arc::new(Mutex::new(ProjectionIterator { schema: self.schema.clone(), expr: self.expr.clone(), input: self.input.execute()?, - })) + }))) } } @@ -112,13 +112,14 @@ impl Partition for ProjectionPartition { struct ProjectionIterator { schema: Arc, expr: Vec>, - input: Arc, + input: Arc>, } impl BatchIterator for ProjectionIterator { /// Get the next batch - fn next(&self) -> Result> { - match self.input.next()? { + fn next(&mut self) -> Result> { + let mut input = self.input.lock().unwrap(); + match input.next()? { Some(batch) => { let arrays: Result> = self.expr.iter().map(|expr| expr.evaluate(&batch)).collect(); @@ -163,6 +164,12 @@ mod tests { let projection = ProjectionExec::try_new(vec![], Arc::new(csv))?; + for partition in projection.partitions()? { + let iterator = partition.execute()?; + let mut iterator = iterator.lock().unwrap(); + while let Some(batch) = iterator.next()? {} + } + //TODO assertions Ok(())