diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs new file mode 100644 index 0000000000000..2c7ebc2d9ae61 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/csv.rs @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading CSV files + +use std::fs; +use std::fs::File; +use std::sync::{Arc, Mutex}; + +use crate::error::{ExecutionError, Result}; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; +use arrow::csv; +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; + +/// Execution plan for scanning a CSV file +pub struct CsvExec { + /// Path to directory containing partitioned CSV files with the same schema + path: String, + /// Schema representing the CSV files after the optional projection is applied + schema: Arc, + /// Does the CSV file have a header? + has_header: bool, + /// Optional projection for which columns to load + projection: Option>, + /// Batch size + batch_size: usize, +} + +impl ExecutionPlan for CsvExec { + /// Get the schema for this execution plan + fn schema(&self) -> Arc { + self.schema.clone() + } + + /// Get the partitions for this execution plan. Each partition can be executed in parallel. + fn partitions(&self) -> Result>> { + let mut filenames: Vec = vec![]; + self.build_file_list(&self.path, &mut filenames)?; + let partitions = filenames + .iter() + .map(|filename| { + Arc::new(CsvPartition::new( + &filename, + self.schema.clone(), + self.has_header, + self.projection.clone(), + self.batch_size, + )) as Arc + }) + .collect(); + Ok(partitions) + } +} + +impl CsvExec { + /// Create a new execution plan for reading a set of CSV files + pub fn try_new( + path: &str, + schema: Arc, + has_header: bool, + projection: Option>, + batch_size: usize, + ) -> Result { + let projected_schema = match &projection { + Some(p) => { + let projected_fields: Vec = + p.iter().map(|i| schema.fields()[*i].clone()).collect(); + + Arc::new(Schema::new(projected_fields)) + } + None => schema, + }; + + Ok(Self { + path: path.to_string(), + schema: projected_schema, + has_header, + projection, + batch_size, + }) + } + + /// Recursively build a list of csv files in a directory + fn build_file_list(&self, dir: &str, filenames: &mut Vec) -> Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if let Some(path_name) = path.to_str() { + if path.is_dir() { + self.build_file_list(path_name, filenames)?; + } else { + if path_name.ends_with(".csv") { + filenames.push(path_name.to_string()); + } + } + } else { + return Err(ExecutionError::General("Invalid path".to_string())); + } + } + Ok(()) + } +} + +/// CSV Partition +struct CsvPartition { + /// Path to the CSV File + path: String, + /// Schema representing the CSV file + schema: Arc, + /// Does the CSV file have a header? + has_header: bool, + /// Optional projection for which columns to load + projection: Option>, + /// Batch size + batch_size: usize, +} + +impl CsvPartition { + fn new( + path: &str, + schema: Arc, + has_header: bool, + projection: Option>, + batch_size: usize, + ) -> Self { + Self { + path: path.to_string(), + schema, + has_header, + projection, + batch_size, + } + } +} + +impl Partition for CsvPartition { + /// Execute this partition and return an iterator over RecordBatch + fn execute(&self) -> Result>> { + Ok(Arc::new(Mutex::new(CsvIterator::try_new( + &self.path, + self.schema.clone(), + self.has_header, + &self.projection, + self.batch_size, + )?))) + } +} + +/// Iterator over batches +struct CsvIterator { + /// Arrow CSV reader + reader: csv::Reader, +} + +impl CsvIterator { + /// Create an iterator for a CSV file + pub fn try_new( + filename: &str, + schema: Arc, + has_header: bool, + projection: &Option>, + batch_size: usize, + ) -> Result { + let file = File::open(filename)?; + let reader = csv::Reader::new( + file, + schema.clone(), + has_header, + batch_size, + projection.clone(), + ); + + Ok(Self { reader }) + } +} + +impl BatchIterator for CsvIterator { + /// Get the next RecordBatch + fn next(&mut self) -> Result> { + Ok(self.reader.next()?) + } +} diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs new file mode 100644 index 0000000000000..bd888a33f6136 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +use crate::error::Result; +use crate::execution::physical_plan::PhysicalExpr; +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; + +/// Represents the column at a given index in a RecordBatch +pub struct Column { + index: usize, +} + +impl Column { + /// Create a new column expression + pub fn new(index: usize) -> Self { + Self { index } + } +} + +impl PhysicalExpr for Column { + /// Get the name to use in a schema to represent the result of this expression + fn name(&self) -> String { + format!("c{}", self.index) + } + + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, input_schema: &Schema) -> Result { + Ok(input_schema.field(self.index).data_type().clone()) + } + + /// Evaluate the expression + fn evaluate(&self, batch: &RecordBatch) -> Result { + Ok(batch.column(self.index).clone()) + } +} diff --git a/rust/datafusion/src/execution/physical_plan.rs b/rust/datafusion/src/execution/physical_plan/mod.rs similarity index 68% rename from rust/datafusion/src/execution/physical_plan.rs rename to rust/datafusion/src/execution/physical_plan/mod.rs index ee7c62ad2bfe3..7dae854b630d7 100644 --- a/rust/datafusion/src/execution/physical_plan.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -17,11 +17,12 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use arrow::datatypes::Schema; -use arrow::record_batch::RecordBatch; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::error::Result; +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; /// Partition-aware execution plan for a relation pub trait ExecutionPlan { @@ -34,11 +35,25 @@ pub trait ExecutionPlan { /// Represents a partition of an execution plan that can be executed on a thread pub trait Partition: Send + Sync { /// Execute this partition and return an iterator over RecordBatch - fn execute(&self) -> Result>; + fn execute(&self) -> Result>>; } /// Iterator over RecordBatch that can be sent between threads pub trait BatchIterator: Send + Sync { /// Get the next RecordBatch - fn next(&self) -> Result>; + fn next(&mut self) -> Result>; } + +/// Expression that can be evaluated against a RecordBatch +pub trait PhysicalExpr: Send + Sync { + /// Get the name to use in a schema to represent the result of this expression + fn name(&self) -> String; + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, input_schema: &Schema) -> Result; + /// Evaluate an expression against a RecordBatch + fn evaluate(&self, batch: &RecordBatch) -> Result; +} + +pub mod csv; +pub mod expressions; +pub mod projection; diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs new file mode 100644 index 0000000000000..c384b864cc14b --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/projection.rs @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the projection execution plan. A projection determines which columns or expressions +//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example +//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the +//! projection expressions. + +use std::sync::{Arc, Mutex}; + +use crate::error::Result; +use crate::execution::physical_plan::{ + BatchIterator, ExecutionPlan, Partition, PhysicalExpr, +}; +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; + +/// Execution plan for a projection +pub struct ProjectionExec { + /// The projection expressions + expr: Vec>, + /// The schema once the projection has been applied to the input + schema: Arc, + /// The input plan + input: Arc, +} + +impl ProjectionExec { + /// Create a projection on an input + pub fn try_new( + expr: Vec>, + input: Arc, + ) -> Result { + let input_schema = input.schema(); + + let fields: Result> = expr + .iter() + .map(|e| Ok(Field::new(&e.name(), e.data_type(&input_schema)?, true))) + .collect(); + + let schema = Arc::new(Schema::new(fields?)); + + Ok(Self { + expr: expr.clone(), + schema, + input: input.clone(), + }) + } +} + +impl ExecutionPlan for ProjectionExec { + /// Get the schema for this execution plan + fn schema(&self) -> Arc { + self.schema.clone() + } + + /// Get the partitions for this execution plan + fn partitions(&self) -> Result>> { + let partitions: Vec> = self + .input + .partitions()? + .iter() + .map(|p| { + let expr = self.expr.clone(); + let projection: Arc = Arc::new(ProjectionPartition { + schema: self.schema.clone(), + expr, + input: p.clone() as Arc, + }); + + projection + }) + .collect(); + + Ok(partitions) + } +} + +/// Represents a single partition of a projection execution plan +struct ProjectionPartition { + schema: Arc, + expr: Vec>, + input: Arc, +} + +impl Partition for ProjectionPartition { + /// Execute the projection + fn execute(&self) -> Result>> { + Ok(Arc::new(Mutex::new(ProjectionIterator { + schema: self.schema.clone(), + expr: self.expr.clone(), + input: self.input.execute()?, + }))) + } +} + +/// Projection iterator +struct ProjectionIterator { + schema: Arc, + expr: Vec>, + input: Arc>, +} + +impl BatchIterator for ProjectionIterator { + /// Get the next batch + fn next(&mut self) -> Result> { + let mut input = self.input.lock().unwrap(); + match input.next()? { + Some(batch) => { + let arrays: Result> = + self.expr.iter().map(|expr| expr.evaluate(&batch)).collect(); + Ok(Some(RecordBatch::try_new(self.schema.clone(), arrays?)?)) + } + None => Ok(None), + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::execution::physical_plan::csv::CsvExec; + use crate::execution::physical_plan::expressions::Column; + use arrow::datatypes::{DataType, Field, Schema}; + use std::env; + use std::fs; + use std::fs::File; + use std::io::prelude::*; + use std::io::{BufReader, BufWriter}; + use std::path::Path; + + #[test] + fn project_first_column() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c3", DataType::Int16, false), + Field::new("c4", DataType::Int32, false), + Field::new("c5", DataType::Int64, false), + Field::new("c6", DataType::UInt8, false), + Field::new("c7", DataType::UInt16, false), + Field::new("c8", DataType::UInt32, false), + Field::new("c9", DataType::UInt64, false), + Field::new("c10", DataType::Float32, false), + Field::new("c11", DataType::Float64, false), + Field::new("c12", DataType::Utf8, false), + ])); + + let partitions = 4; + let path = create_partitioned_csv("aggregate_test_100.csv", partitions)?; + + let csv = CsvExec::try_new(&path, schema, true, None, 1024)?; + + let projection = + ProjectionExec::try_new(vec![Arc::new(Column::new(0))], Arc::new(csv))?; + + let mut partition_count = 0; + let mut row_count = 0; + for partition in projection.partitions()? { + partition_count += 1; + let iterator = partition.execute()?; + let mut iterator = iterator.lock().unwrap(); + while let Some(batch) = iterator.next()? { + assert_eq!(1, batch.num_columns()); + row_count += batch.num_rows(); + } + } + assert_eq!(partitions, partition_count); + assert_eq!(100, row_count); + + Ok(()) + } + + /// Generated partitioned copy of a CSV file + fn create_partitioned_csv(filename: &str, partitions: usize) -> Result { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let path = format!("{}/csv/{}", testdata, filename); + + let mut dir = env::temp_dir(); + dir.push(&format!("{}-{}", filename, partitions)); + + if Path::new(&dir).exists() { + fs::remove_dir_all(&dir).unwrap(); + } + fs::create_dir(dir.clone()).unwrap(); + + let mut writers = vec![]; + for i in 0..partitions { + let mut filename = dir.clone(); + filename.push(format!("part{}.csv", i)); + let writer = BufWriter::new(File::create(&filename).unwrap()); + writers.push(writer); + } + + let f = File::open(&path)?; + let f = BufReader::new(f); + let mut i = 0; + for line in f.lines() { + let line = line.unwrap(); + + if i == 0 { + // write header to all partitions + for w in writers.iter_mut() { + w.write(line.as_bytes()).unwrap(); + w.write(b"\n").unwrap(); + } + } else { + // write data line to single partition + let partition = i % partitions; + writers[partition].write(line.as_bytes()).unwrap(); + writers[partition].write(b"\n").unwrap(); + } + + i += 1; + } + for w in writers.iter_mut() { + w.flush().unwrap(); + } + + Ok(dir.as_os_str().to_str().unwrap().to_string()) + } +}