Skip to content

Commit

Permalink
Rework with_preserve_order usage
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 11, 2023
1 parent ee55ec3 commit 0c492e1
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 69 deletions.
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,14 +929,12 @@ fn add_roundrobin_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
let should_preserve_ordering = input.output_ordering().is_some();

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input, partitioning)?;
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
as Arc<dyn ExecutionPlan>;
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();

// update distribution onward with new operator
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Ok(new_plan)
} else {
Expand Down Expand Up @@ -999,7 +997,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
Expand All @@ -1008,9 +1005,10 @@ fn add_hash_on_top(
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
new_plan =
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
// preserve any ordering if possible
.with_preserve_order();
new_plan = Arc::new(repartition) as _;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1159,11 +1157,11 @@ fn replace_order_preserving_variants_helper(
if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
if repartition.preserve_order() {
return Ok(Arc::new(
// new RepartitionExec don't preserve order
RepartitionExec::try_new(
updated_children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
)?,
));
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,11 @@ fn remove_corresponding_sort_from_sub_plan(
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
Arc::new(
// By default, RepartitionExec does not preserve order
RepartitionExec::try_new(
children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
)?,
)
} else {
plan.clone().with_new_children(children)?
Expand Down Expand Up @@ -844,7 +844,6 @@ mod tests {
};
}


#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ fn get_updated_plan(
// a `SortPreservingRepartitionExec` if appropriate:
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
let child = plan.children().swap_remove(0);
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?;
plan = Arc::new(repartition.with_preserve_order(true)) as _
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order();
plan = Arc::new(repartition) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionP
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand All @@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
114 changes: 64 additions & 50 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,34 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, vec};

use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use futures::stream::Stream;
use futures::{FutureExt, StreamExt};
use hashbrown::HashMap;
use log::trace;
use parking_lot::Mutex;
use tokio::task::JoinHandle;

use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};

use crate::common::transpose;
use crate::hash_utils::create_hashes;
use crate::metrics::BaselineMetrics;
use crate::repartition::distributor_channels::{channels, partition_aware_channels};
use crate::sorts::streaming_merge;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};

use self::distributor_channels::{DistributionReceiver, DistributionSender};

use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation};
use super::expressions::PhysicalSortExpr;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};

use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};

use futures::stream::Stream;
use futures::{FutureExt, StreamExt};
use hashbrown::HashMap;
use log::trace;
use parking_lot::Mutex;
use tokio::task::JoinHandle;
use self::distributor_channels::{DistributionReceiver, DistributionSender};

mod distributor_channels;

Expand Down Expand Up @@ -428,9 +428,12 @@ impl ExecutionPlan for RepartitionExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let repartition =
RepartitionExec::try_new(children.swap_remove(0), self.partitioning.clone());
repartition.map(|r| Arc::new(r.with_preserve_order(self.preserve_order)) as _)
let mut repartition =
RepartitionExec::try_new(children.swap_remove(0), self.partitioning.clone())?;
if self.preserve_order {
repartition = repartition.with_preserve_order();
}
Ok(Arc::new(repartition))
}

/// Specifies whether this plan generates an infinite stream of records.
Expand Down Expand Up @@ -628,7 +631,9 @@ impl ExecutionPlan for RepartitionExec {
}

impl RepartitionExec {
/// Create a new RepartitionExec
/// Create a new RepartitionExec, that produces output `partitioning`, and
/// does not preserve the order of the input (see [`Self::with_preserve_order`]
/// for more details)
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
Expand All @@ -652,8 +657,8 @@ impl RepartitionExec {
///
/// If the input is not ordered, or has only one partition, this is a no op,
/// and the node remains a `RepartitionExec`.
pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
self.preserve_order = preserve_order &&
pub fn with_preserve_order(mut self) -> Self {
self.preserve_order =
// If the input isn't ordered, there is no ordering to preserve
self.input.output_ordering().is_some() &&
// if there is only one input partition, merging is not required
Expand Down Expand Up @@ -918,7 +923,19 @@ impl RecordBatchStream for PerPartitionStream {

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;

use arrow::array::{ArrayRef, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::UInt32Array;
use futures::FutureExt;
use tokio::task::JoinHandle;

use datafusion_common::cast::as_string_array;
use datafusion_common::{assert_batches_sorted_eq, exec_err};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

use crate::{
test::{
assert_is_pending,
Expand All @@ -929,16 +946,8 @@ mod tests {
},
{collect, expressions::col, memory::MemoryExec},
};
use arrow::array::{ArrayRef, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::UInt32Array;
use datafusion_common::cast::as_string_array;
use datafusion_common::{assert_batches_sorted_eq, exec_err};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use futures::FutureExt;
use std::collections::HashSet;
use tokio::task::JoinHandle;

use super::*;

#[tokio::test]
async fn one_to_many_round_robin() -> Result<()> {
Expand Down Expand Up @@ -1443,11 +1452,14 @@ mod tests {
#[cfg(test)]
mod test {
use arrow_schema::{DataType, Field, Schema, SortOptions};

use datafusion_physical_expr::expressions::col;

use crate::memory::MemoryExec;
use super::*;
use crate::union::UnionExec;

use super::*;

/// Asserts that the plan is as expected
///
/// `$EXPECTED_PLAN_LINES`: input plan
Expand Down Expand Up @@ -1477,9 +1489,10 @@ mod test {
let source2 = sorted_memory_exec(&schema, sort_exprs);
// output has multiple partitions, and is sorted
let union = UnionExec::new(vec![source1, source2]);
let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true);
let exec =
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order();

// Repartition should preserve order
let expected_plan = [
Expand All @@ -1498,9 +1511,9 @@ mod test {
let sort_exprs = sort_exprs(&schema);
let source = sorted_memory_exec(&schema, sort_exprs);
// output is sorted, but has only a single partition, so no need to sort
let exec = RepartitionExec::try_new(source, Partitioning::RoundRobinBatch(10))
let exec = RepartitionExec::try_new(source, Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true);
.with_preserve_order();

// Repartition should not preserve order
let expected_plan = [
Expand All @@ -1518,9 +1531,10 @@ mod test {
let source2 = memory_exec(&schema);
// output has multiple partitions, but is not sorted
let union = UnionExec::new(vec![source1, source2]);
let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true);
let exec =
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order();

// Repartition should not preserve order, as there is no order to preserve
let expected_plan = [
Expand All @@ -1545,18 +1559,18 @@ mod test {
}]
}


fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap())
}

fn sorted_memory_exec(schema: &SchemaRef, sort_exprs: Vec<PhysicalSortExpr>) -> Arc<dyn ExecutionPlan> {
fn sorted_memory_exec(
schema: &SchemaRef,
sort_exprs: Vec<PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()
.with_sort_information(vec![sort_exprs])
MemoryExec::try_new(&[vec![]], schema.clone(), None)
.unwrap()
.with_sort_information(vec![sort_exprs]),
)
}



}
}

0 comments on commit 0c492e1

Please sign in to comment.