Skip to content

Commit

Permalink
implement mutex for iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent abf6d5e commit 717dcd8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
16 changes: 8 additions & 8 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
use std::fs;
use std::fs::File;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;

/// Execution plan for scanning a CSV file
pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
Expand Down Expand Up @@ -101,14 +102,14 @@ impl CsvPartition {

impl Partition for CsvPartition {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<dyn BatchIterator>> {
Ok(Arc::new(CsvIterator::try_new(
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(CsvIterator::try_new(
&self.path,
self.schema.clone(),
true, //TODO: do not hard-code
&None, //TODO: do not hard-code
1024,
)?)) //TODO: do not hard-code
1024, //TODO: do not hard-code
)?)))
}
}

Expand Down Expand Up @@ -157,8 +158,7 @@ impl CsvIterator {

impl BatchIterator for CsvIterator {
/// Get the next RecordBatch
fn next(&self) -> Result<Option<RecordBatch>> {
//Ok(self.reader.next()?)
unimplemented!()
fn next(&mut self) -> Result<Option<RecordBatch>> {
Ok(self.reader.next()?)
}
}
6 changes: 3 additions & 3 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use arrow::array::ArrayRef;
Expand All @@ -35,13 +35,13 @@ pub trait ExecutionPlan {
/// Represents a partition of an execution plan that can be executed on a thread
pub trait Partition: Send + Sync {
/// Execute this partition and return an iterator over RecordBatch
fn execute(&self) -> Result<Arc<dyn BatchIterator>>;
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>>;
}

/// Iterator over RecordBatch that can be sent between threads
pub trait BatchIterator: Send + Sync {
/// Get the next RecordBatch
fn next(&self) -> Result<Option<RecordBatch>>;
fn next(&mut self) -> Result<Option<RecordBatch>>;
}

/// Expression that can be evaluated against a RecordBatch
Expand Down
21 changes: 14 additions & 7 deletions rust/datafusion/src/execution/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
//! projection expressions.
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use crate::execution::physical_plan::{
Expand Down Expand Up @@ -99,26 +99,27 @@ struct ProjectionPartition {

impl Partition for ProjectionPartition {
/// Execute the projection
fn execute(&self) -> Result<Arc<dyn BatchIterator>> {
Ok(Arc::new(ProjectionIterator {
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(ProjectionIterator {
schema: self.schema.clone(),
expr: self.expr.clone(),
input: self.input.execute()?,
}))
})))
}
}

/// Projection iterator
struct ProjectionIterator {
schema: Arc<Schema>,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: Arc<dyn BatchIterator>,
input: Arc<Mutex<dyn BatchIterator>>,
}

impl BatchIterator for ProjectionIterator {
/// Get the next batch
fn next(&self) -> Result<Option<RecordBatch>> {
match self.input.next()? {
fn next(&mut self) -> Result<Option<RecordBatch>> {
let mut input = self.input.lock().unwrap();
match input.next()? {
Some(batch) => {
let arrays: Result<Vec<_>> =
self.expr.iter().map(|expr| expr.evaluate(&batch)).collect();
Expand Down Expand Up @@ -163,6 +164,12 @@ mod tests {

let projection = ProjectionExec::try_new(vec![], Arc::new(csv))?;

for partition in projection.partitions()? {
let iterator = partition.execute()?;
let mut iterator = iterator.lock().unwrap();
while let Some(batch) = iterator.next()? {}
}

//TODO assertions

Ok(())
Expand Down

0 comments on commit 717dcd8

Please sign in to comment.