Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort Merge Join #2

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,8 @@ use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required, into_required};
use log::debug;

use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
Expand All @@ -46,7 +39,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc
use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::joins::cross_join::CrossJoinExec;
use datafusion::physical_plan::joins::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::parquet::ParquetPartition;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
Expand All @@ -56,7 +50,6 @@ use datafusion::physical_plan::window_functions::{
use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
cross_join::CrossJoinExec,
csv::CsvExec,
empty::EmptyExec,
expressions::{
Expand All @@ -65,7 +58,6 @@ use datafusion::physical_plan::{
},
filter::FilterExec,
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
hash_join::HashJoinExec,
limit::{GlobalLimitExec, LocalLimitExec},
parquet::ParquetExec,
projection::ProjectionExec,
Expand All @@ -78,10 +70,19 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, Statistics, WindowExpr,
};
use datafusion::prelude::CsvReadOptions;
use log::debug;
use protobuf::physical_expr_node::ExprType;
use protobuf::physical_plan_node::PhysicalPlanType;

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required, into_required};

impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
type Error = BallistaError;

Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod to_proto;
mod roundtrip_tests {
use std::{convert::TryInto, sync::Arc};

use datafusion::physical_plan::joins::hash_join::{HashJoinExec, PartitionMode};
use datafusion::{
arrow::{
compute::sort::SortOptions,
Expand All @@ -34,7 +35,6 @@ mod roundtrip_tests {
expressions::{Avg, Column, PhysicalSortExpr},
filter::FilterExec,
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
sorts::sort::SortExec,
AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
Expand All @@ -43,9 +43,10 @@ mod roundtrip_tests {
scalar::ScalarValue,
};

use crate::execution_plans::ShuffleWriterExec;

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ use std::{

use datafusion::logical_plan::JoinType;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{
CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
};
use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::AggregateMode;
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::joins::cross_join::CrossJoinExec;
use datafusion::physical_plan::joins::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition};
use datafusion::physical_plan::projection::ProjectionExec;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::joins::hash_join::HashJoinExec;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod test {
use ballista_core::serde::protobuf;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::joins::hash_join::HashJoinExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
Expand Down
149 changes: 149 additions & 0 deletions datafusion/src/arrow_dyn_list_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DynMutableListArray from arrow/io/avro/read/nested.rs

use arrow::array::{Array, ListArray, MutableArray, Offset};
use arrow::bitmap::MutableBitmap;
use arrow::buffer::MutableBuffer;
use arrow::datatypes::DataType;
use arrow::error::ArrowError;
use std::sync::Arc;

/// Auxiliary struct
#[derive(Debug)]
pub struct DynMutableListArray<O: Offset> {
data_type: DataType,
offsets: MutableBuffer<O>,
values: Box<dyn MutableArray>,
validity: Option<MutableBitmap>,
}

impl<O: Offset> DynMutableListArray<O> {
pub fn new_from(
values: Box<dyn MutableArray>,
data_type: DataType,
capacity: usize,
) -> Self {
let mut offsets = MutableBuffer::<O>::with_capacity(capacity + 1);
offsets.push(O::default());
assert_eq!(values.len(), 0);
ListArray::<O>::get_child_field(&data_type);
Self {
data_type,
offsets,
values,
validity: None,
}
}

/// Creates a new [`MutableListArray`] from a [`MutableArray`] and capacity.
pub fn new_with_capacity(values: Box<dyn MutableArray>, capacity: usize) -> Self {
let data_type = ListArray::<O>::default_datatype(values.data_type().clone());
Self::new_from(values, data_type, capacity)
}

/// The values
pub fn mut_values(&mut self) -> &mut dyn MutableArray {
self.values.as_mut()
}

#[inline]
pub fn try_push_valid(&mut self) -> Result<(), ArrowError> {
let size = self.values.len();
let size = O::from_usize(size).ok_or(ArrowError::KeyOverflowError)?; // todo: make this error
assert!(size >= *self.offsets.last().unwrap());

self.offsets.push(size);
if let Some(validity) = &mut self.validity {
validity.push(true)
}
Ok(())
}

#[inline]
fn push_null(&mut self) {
self.offsets.push(self.last_offset());
match &mut self.validity {
Some(validity) => validity.push(false),
None => self.init_validity(),
}
}

#[inline]
fn last_offset(&self) -> O {
*self.offsets.last().unwrap()
}

fn init_validity(&mut self) {
let len = self.offsets.len() - 1;

let mut validity = MutableBitmap::new();
validity.extend_constant(len, true);
validity.set(len - 1, false);
self.validity = Some(validity)
}
}

impl<O: Offset> MutableArray for DynMutableListArray<O> {
fn len(&self) -> usize {
self.offsets.len() - 1
}

fn validity(&self) -> Option<&MutableBitmap> {
self.validity.as_ref()
}

fn as_box(&mut self) -> Box<dyn Array> {
Box::new(ListArray::from_data(
self.data_type.clone(),
std::mem::take(&mut self.offsets).into(),
self.values.as_arc(),
std::mem::take(&mut self.validity).map(|x| x.into()),
))
}

fn as_arc(&mut self) -> Arc<dyn Array> {
Arc::new(ListArray::from_data(
self.data_type.clone(),
std::mem::take(&mut self.offsets).into(),
self.values.as_arc(),
std::mem::take(&mut self.validity).map(|x| x.into()),
))
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn push_null(&mut self) {
self.push_null()
}

fn shrink_to_fit(&mut self) {
todo!();
}
}
1 change: 1 addition & 0 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ pub mod variable;
// re-export dependencies from arrow-rs to minimise version maintenance for crate users
pub use arrow;

mod arrow_dyn_list_array;
mod arrow_temporal_util;

#[cfg(test)]
Expand Down
9 changes: 6 additions & 3 deletions datafusion/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use super::optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

use crate::physical_plan::joins::hash_join::HashJoinExec;
use crate::{
error::Result,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
repartition::RepartitionExec,
},
};
use std::sync::Arc;

use super::optimizer::PhysicalOptimizerRule;

/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches
pub struct CoalesceBatches {}
Expand Down
17 changes: 8 additions & 9 deletions datafusion/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ use std::sync::Arc;

use arrow::datatypes::Schema;

use crate::error::Result;
use crate::execution::context::ExecutionConfig;
use crate::logical_plan::JoinType;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::joins::cross_join::CrossJoinExec;
use crate::physical_plan::joins::hash_join::HashJoinExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{ExecutionPlan, PhysicalExpr};

use super::optimizer::PhysicalOptimizerRule;
use super::utils::optimize_children;
use crate::error::Result;

/// BuildProbeOrder reorders the build and probe phase of
/// hash joins. This uses the amount of rows that a datasource has.
Expand Down Expand Up @@ -153,16 +153,15 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {

#[cfg(test)]
mod tests {
use crate::{
physical_plan::{hash_join::PartitionMode, Statistics},
test::exec::StatisticsExec,
};

use super::*;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};

use crate::physical_plan::joins::hash_join::PartitionMode;
use crate::{physical_plan::Statistics, test::exec::StatisticsExec};

use super::*;

fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
Statistics {
Expand Down
12 changes: 12 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ impl PhysicalSortExpr {
}
}

/// Convert sort expressions into Vec<SortColumn> that can be passed into arrow sort kernel
pub fn exprs_to_sort_columns(
batch: &RecordBatch,
expr: &[PhysicalSortExpr],
) -> Result<Vec<SortColumn>> {
let columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>();
columns
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading