Skip to content

Commit

Permalink
implement collect()
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 7, 2019
1 parent c692217 commit 42566c3
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

//! ExecutionContext contains methods for registering data sources and executing SQL
//! queries
//! ExecutionContext contains methods for registering data sources and executing queries
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::string::String;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use arrow::datatypes::*;

Expand Down Expand Up @@ -268,13 +269,35 @@ impl ExecutionContext {
}

/// Execute a physical plan and collect the results in memory
pub fn collect(&self, _plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
unimplemented!()
}
pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = plan
.partitions()?
.iter()
.map(|p| {
let p = p.clone();
thread::spawn(move || {
let it = p.execute().unwrap();
let mut it = it.lock().unwrap();
let mut results: Vec<RecordBatch> = vec![];
while let Ok(Some(batch)) = it.next() {
results.push(batch);
}
Ok(results)
})
})
.collect();

// combine the results from each thread
let mut combined_results: Vec<RecordBatch> = vec![];
for thread in threads {
let result = thread.join().unwrap();
let result = result.unwrap();
result
.iter()
.for_each(|batch| combined_results.push(batch.clone()));
}

/// Execute a physical plan and write the results in CSV format
pub fn write_csv(&self, _plan: &dyn ExecutionPlan, _path: &str) -> Result<()> {
unimplemented!()
Ok(combined_results)
}

/// Execute a logical plan and produce a Relation (a schema-aware iterator over a
Expand Down

0 comments on commit 42566c3

Please sign in to comment.