Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-6101: [Rust] [DataFusion] Parallel execution of physical query plan #5111

Closed
wants to merge 13 commits into from
2 changes: 1 addition & 1 deletion rust/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ crossbeam = "0.7.1"

[dev-dependencies]
criterion = "0.2.0"

tempdir = "0.3.7"

[[bench]]
name = "aggregate_query_sql"
Expand Down
19 changes: 13 additions & 6 deletions rust/datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
//! CSV Data source

use std::fs::File;
use std::string::String;
use std::sync::{Arc, Mutex};

use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use std::string::String;
use std::sync::Arc;

use crate::datasource::{ScanResult, TableProvider};
use crate::error::Result;
use crate::execution::physical_plan::BatchIterator;
use crate::execution::physical_plan::csv::CsvExec;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan};

/// Represents a CSV file with a provided schema
// TODO: usage example (rather than documenting `new()`)
Expand Down Expand Up @@ -58,13 +59,19 @@ impl TableProvider for CsvFile {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new(
let exec = CsvExec::try_new(
&self.filename,
self.schema.clone(),
self.has_header,
projection,
projection.clone(),
batch_size,
)))])
)?;
let partitions = exec.partitions()?;
let iterators = partitions
.iter()
.map(|p| p.execute())
.collect::<Result<Vec<_>>>()?;
Ok(iterators)
}
}

Expand Down
173 changes: 171 additions & 2 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
// specific language governing permissions and limitations
// under the License.

//! ExecutionContext contains methods for registering data sources and executing SQL
//! queries
//! ExecutionContext contains methods for registering data sources and executing queries

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::string::String;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use arrow::datatypes::*;

use crate::arrow::array::{ArrayRef, BooleanBuilder};
use crate::arrow::record_batch::RecordBatch;
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
Expand All @@ -35,6 +37,10 @@ use crate::execution::aggregate::AggregateRelation;
use crate::execution::expression::*;
use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::Column;
use crate::execution::physical_plan::projection::ProjectionExec;
use crate::execution::physical_plan::{ExecutionPlan, PhysicalExpr};
use crate::execution::projection::ProjectRelation;
use crate::execution::relation::{DataSourceRelation, Relation};
use crate::execution::scalar_relation::ScalarRelation;
Expand Down Expand Up @@ -210,6 +216,112 @@ impl ExecutionContext {
Ok(plan)
}

/// Create a physical plan from a logical plan
pub fn create_physical_plan(
&mut self,
logical_plan: &Arc<LogicalPlan>,
batch_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
match logical_plan.as_ref() {
LogicalPlan::TableScan {
table_name,
projection,
..
} => match (*self.datasources).borrow().get(table_name) {
Some(provider) => {
let partitions = provider.scan(projection, batch_size)?;
if partitions.is_empty() {
Err(ExecutionError::General(
"Table provider returned no partitions".to_string(),
))
} else {
let partition = partitions[0].lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the unwrap here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I think we want the unwrap. From https://doc.rust-lang.org/std/sync/struct.Mutex.html:

Most usage of a mutex will simply unwrap() these results, propagating panics among threads to ensure that a possibly invalid invariant is not witnessed.

These queries are now happening on threads, so although a panic here would fail the query, it would not fail the main thread that launched the thread(s) to execute the query.

let schema = partition.schema();
let exec =
DatasourceExec::new(schema.clone(), partitions.clone());
Ok(Arc::new(exec))
}
}
_ => Err(ExecutionError::General(format!(
"No table named {}",
table_name
))),
},
LogicalPlan::Projection { input, expr, .. } => {
let input = self.create_physical_plan(input, batch_size)?;
let input_schema = input.as_ref().schema().clone();
let runtime_expr = expr
.iter()
.map(|e| self.create_physical_expr(e, &input_schema))
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
}
_ => Err(ExecutionError::General(
"Unsupported logical plan variant".to_string(),
)),
}
}

/// Create a physical expression from a logical expression
pub fn create_physical_expr(
&self,
e: &Expr,
_input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Column(i) => Ok(Arc::new(Column::new(*i))),
_ => Err(ExecutionError::NotImplemented(
"Unsupported expression".to_string(),
)),
}
}

/// Execute a physical plan and collect the results in memory
pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = plan
.partitions()?
.iter()
.map(|p| {
let p = p.clone();
thread::spawn(move || {
let it = p.execute()?;
let mut it = it.lock().unwrap();
let mut results: Vec<RecordBatch> = vec![];
loop {
match it.next() {
Ok(Some(batch)) => {
results.push(batch);
}
Ok(None) => {
// end of result set
return Ok(results);
}
Err(e) => return Err(e),
}
}
})
})
.collect();

// combine the results from each thread
let mut combined_results: Vec<RecordBatch> = vec![];
for thread in threads {
match thread.join() {
Ok(result) => {
let result = result?;
result
.iter()
.for_each(|batch| combined_results.push(batch.clone()));
}
Err(_) => {
return Err(ExecutionError::General("Thread failed".to_string()))
}
}
}

Ok(combined_results)
}

/// Execute a logical plan and produce a Relation (a schema-aware iterator over a
/// series of RecordBatch instances)
pub fn execute(
Expand Down Expand Up @@ -402,3 +514,60 @@ impl SchemaProvider for ExecutionContextSchemaProvider {
None
}
}

#[cfg(test)]
mod tests {

use super::*;
use std::fs::File;
use std::io::prelude::*;
use tempdir::TempDir;

#[test]
fn parallel_projection() -> Result<()> {
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt32, false),
]));

let tmp_dir = TempDir::new("parallel_projection")?;

// generate a partitioned file
let partition_count = 4;
for partition in 0..partition_count {
let filename = format!("partition-{}.csv", partition);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{}\n", partition, i);
file.write_all(data.as_bytes())?;
}
}

// register csv file with the execution context
ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true);

let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(partition_count, results.len());

// each batch should contain 2 columns and 10 rows
for batch in &results {
assert_eq!(2, batch.num_columns());
assert_eq!(10, batch.num_rows());
}

Ok(())
}

}
44 changes: 21 additions & 23 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
//! Execution plan for reading CSV files

use std::fs;
use std::fs::metadata;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;

/// Execution plan for scanning a CSV file
Expand Down Expand Up @@ -76,19 +77,9 @@ impl CsvExec {
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let projected_schema = match &projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => schema,
};

Ok(Self {
path: path.to_string(),
schema: projected_schema,
schema,
has_header,
projection,
batch_size,
Expand All @@ -97,19 +88,26 @@ impl CsvExec {

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(".csv") {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
Ok(())
Expand Down
Loading