diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 5e5b96f6ba8c..e7583501f9d9 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -40,7 +40,7 @@ pub type PartitionedFileStream = /// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" /// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping /// sections of a Parquet file in parallel. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] pub struct FileRange { /// Range start pub start: i64, @@ -70,13 +70,12 @@ pub struct PartitionedFile { /// An optional field for user defined per object metadata pub extensions: Option>, } - impl PartitionedFile { /// Create a simple file without metadata or partition - pub fn new(path: String, size: u64) -> Self { + pub fn new(path: impl Into, size: u64) -> Self { Self { object_meta: ObjectMeta { - location: Path::from(path), + location: Path::from(path.into()), last_modified: chrono::Utc.timestamp_nanos(0), size: size as usize, e_tag: None, @@ -99,9 +98,10 @@ impl PartitionedFile { version: None, }, partition_values: vec![], - range: Some(FileRange { start, end }), + range: None, extensions: None, } + .with_range(start, end) } /// Return a file reference from the given path @@ -114,6 +114,12 @@ impl PartitionedFile { pub fn path(&self) -> &Path { &self.object_meta.location } + + /// Update the file to only scan the specified range (in bytes) + pub fn with_range(mut self, start: i64, end: i64) -> Self { + self.range = Some(FileRange { start, end }); + self + } } impl From for PartitionedFile { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 816a82543bab..0eca37da139d 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -23,7 +23,7 @@ use std::ops::Range; use std::sync::Arc; use std::task::Poll; -use super::FileScanConfig; +use super::{FileGroupPartitioner, FileScanConfig}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::{FileRange, ListingTableUrl}; use crate::datasource::physical_plan::file_stream::{ @@ -177,7 +177,7 @@ impl ExecutionPlan for CsvExec { } /// Redistribute files across partitions according to their size - /// See comments on `repartition_file_groups()` for more detail. + /// See comments on [`FileGroupPartitioner`] for more detail. /// /// Return `None` if can't get repartitioned(empty/compressed file). fn repartitioned( @@ -191,11 +191,11 @@ impl ExecutionPlan for CsvExec { return Ok(None); } - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_preserve_order_within_groups(self.output_ordering().is_some()) + .with_repartition_file_min_size(repartition_file_min_size) + .repartition_file_groups(&self.base_config.file_groups); if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { let mut new_plan = self.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/file_groups.rs b/datafusion/core/src/datasource/physical_plan/file_groups.rs new file mode 100644 index 000000000000..6456bd5c7276 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/file_groups.rs @@ -0,0 +1,826 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logic for managing groups of [`PartitionedFile`]s in DataFusion + +use crate::datasource::listing::{FileRange, PartitionedFile}; +use itertools::Itertools; +use std::cmp::min; +use std::collections::BinaryHeap; +use std::iter::repeat_with; + +/// Repartition input files into `target_partitions` partitions, if total file size exceed +/// `repartition_file_min_size` +/// +/// This partitions evenly by file byte range, and does not have any knowledge +/// of how data is laid out in specific files. The specific `FileOpener` are +/// responsible for the actual partitioning on specific data source type. (e.g. +/// the `CsvOpener` will read lines overlap with byte range as well as +/// handle boundaries to ensure all lines will be read exactly once) +/// +/// # Example +/// +/// For example, if there are two files `A` and `B` that we wish to read with 4 +/// partitions (with 4 threads) they will be divided as follows: +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// ┌─────────────────┐ +/// │ │ │ │ +/// │ File A │ +/// │ │ Range: 0-2MB │ │ +/// │ │ +/// │ └─────────────────┘ │ +/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// ┌─────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ │ ┌─────────────────┐ +/// │ │ │ │ │ │ +/// │ │ │ File A │ +/// │ │ │ │ Range 2-4MB │ │ +/// │ │ │ │ +/// │ │ │ └─────────────────┘ │ +/// │ File A (7MB) │ ────────▶ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ │ ┌─────────────────┐ +/// │ │ │ │ │ │ +/// │ │ │ File A │ +/// │ │ │ │ Range: 4-6MB │ │ +/// │ │ │ │ +/// │ │ │ └─────────────────┘ │ +/// └─────────────────┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// ┌─────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ File B (1MB) │ ┌─────────────────┐ +/// │ │ │ │ File A │ │ +/// └─────────────────┘ │ Range: 6-7MB │ +/// │ └─────────────────┘ │ +/// ┌─────────────────┐ +/// │ │ File B (1MB) │ │ +/// │ │ +/// │ └─────────────────┘ │ +/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// +/// If target_partitions = 4, +/// divides into 4 groups +/// ``` +/// +/// # Maintaining Order +/// +/// Within each group files are read sequentially. Thus, if the overall order of +/// tuples must be preserved, multiple files can not be mixed in the same group. +/// +/// In this case, the code will split the largest files evenly into any +/// available empty groups, but the overall distribution may not not be as even +/// as as even as if the order did not need to be preserved. +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// ┌─────────────────┐ +/// │ │ │ │ +/// │ File A │ +/// │ │ Range: 0-2MB │ │ +/// │ │ +/// ┌─────────────────┐ │ └─────────────────┘ │ +/// │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ │ ┌─────────────────┐ +/// │ │ │ │ │ │ +/// │ │ │ File A │ +/// │ │ │ │ Range 2-4MB │ │ +/// │ File A (6MB) │ ────────▶ │ │ +/// │ (ordered) │ │ └─────────────────┘ │ +/// │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ │ ┌─────────────────┐ +/// │ │ │ │ │ │ +/// │ │ │ File A │ +/// │ │ │ │ Range: 4-6MB │ │ +/// └─────────────────┘ │ │ +/// ┌─────────────────┐ │ └─────────────────┘ │ +/// │ File B (1MB) │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// │ (ordered) │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// └─────────────────┘ ┌─────────────────┐ +/// │ │ File B (1MB) │ │ +/// │ │ +/// │ └─────────────────┘ │ +/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// +/// If target_partitions = 4, +/// divides into 4 groups +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct FileGroupPartitioner { + /// how many partitions should be created + target_partitions: usize, + /// the minimum size for a file to be repartitioned. + repartition_file_min_size: usize, + /// if the order when reading the files must be preserved + preserve_order_within_groups: bool, +} + +impl Default for FileGroupPartitioner { + fn default() -> Self { + Self::new() + } +} + +impl FileGroupPartitioner { + /// Creates a new [`FileGroupPartitioner`] with default values: + /// 1. `target_partitions = 1` + /// 2. `repartition_file_min_size = 10MB` + /// 3. `preserve_order_within_groups = false` + pub fn new() -> Self { + Self { + target_partitions: 1, + repartition_file_min_size: 10 * 1024 * 1024, + preserve_order_within_groups: false, + } + } + + /// Set the target partitions + pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { + self.target_partitions = target_partitions; + self + } + + /// Set the minimum size at which to repartition a file + pub fn with_repartition_file_min_size( + mut self, + repartition_file_min_size: usize, + ) -> Self { + self.repartition_file_min_size = repartition_file_min_size; + self + } + + /// Set whether the order of tuples within a file must be preserved + pub fn with_preserve_order_within_groups( + mut self, + preserve_order_within_groups: bool, + ) -> Self { + self.preserve_order_within_groups = preserve_order_within_groups; + self + } + + /// Repartition input files according to the settings on this [`FileGroupPartitioner`]. + /// + /// If no repartitioning is needed or possible, return `None`. + pub fn repartition_file_groups( + &self, + file_groups: &[Vec], + ) -> Option>> { + if file_groups.is_empty() { + return None; + } + + // Perform redistribution only in case all files should be read from beginning to end + let has_ranges = file_groups.iter().flatten().any(|f| f.range.is_some()); + if has_ranges { + return None; + } + + // special case when order must be preserved + if self.preserve_order_within_groups { + self.repartition_preserving_order(file_groups) + } else { + self.repartition_evenly_by_size(file_groups) + } + } + + /// Evenly repartition files across partitions by size, ignoring any + /// existing grouping / ordering + fn repartition_evenly_by_size( + &self, + file_groups: &[Vec], + ) -> Option>> { + let target_partitions = self.target_partitions; + let repartition_file_min_size = self.repartition_file_min_size; + let flattened_files = file_groups.iter().flatten().collect::>(); + + let total_size = flattened_files + .iter() + .map(|f| f.object_meta.size as i64) + .sum::(); + if total_size < (repartition_file_min_size as i64) || total_size == 0 { + return None; + } + + let target_partition_size = + (total_size as usize + (target_partitions) - 1) / (target_partitions); + + let current_partition_index: usize = 0; + let current_partition_size: usize = 0; + + // Partition byte range evenly for all `PartitionedFile`s + let repartitioned_files = flattened_files + .into_iter() + .scan( + (current_partition_index, current_partition_size), + |state, source_file| { + let mut produced_files = vec![]; + let mut range_start = 0; + while range_start < source_file.object_meta.size { + let range_end = min( + range_start + (target_partition_size - state.1), + source_file.object_meta.size, + ); + + let mut produced_file = source_file.clone(); + produced_file.range = Some(FileRange { + start: range_start as i64, + end: range_end as i64, + }); + produced_files.push((state.0, produced_file)); + + if state.1 + (range_end - range_start) >= target_partition_size { + state.0 += 1; + state.1 = 0; + } else { + state.1 += range_end - range_start; + } + range_start = range_end; + } + Some(produced_files) + }, + ) + .flatten() + .group_by(|(partition_idx, _)| *partition_idx) + .into_iter() + .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec()) + .collect_vec(); + + Some(repartitioned_files) + } + + /// Redistribute file groups across size preserving order + fn repartition_preserving_order( + &self, + file_groups: &[Vec], + ) -> Option>> { + // Can't repartition and preserve order if there are more groups + // than partitions + if file_groups.len() >= self.target_partitions { + return None; + } + let num_new_groups = self.target_partitions - file_groups.len(); + + // If there is only a single file + if file_groups.len() == 1 && file_groups[0].len() == 1 { + return self.repartition_evenly_by_size(file_groups); + } + + // Find which files could be split (single file groups) + let mut heap: BinaryHeap<_> = file_groups + .iter() + .enumerate() + .filter_map(|(group_index, group)| { + // ignore groups that do not have exactly 1 file + if group.len() == 1 { + Some(ToRepartition { + source_index: group_index, + file_size: group[0].object_meta.size, + new_groups: vec![group_index], + }) + } else { + None + } + }) + .collect(); + + // No files can be redistributed + if heap.is_empty() { + return None; + } + + // Add new empty groups to which we will redistribute ranges of existing files + let mut file_groups: Vec<_> = file_groups + .iter() + .cloned() + .chain(repeat_with(Vec::new).take(num_new_groups)) + .collect(); + + // Divide up empty groups + for (group_index, group) in file_groups.iter().enumerate() { + if !group.is_empty() { + continue; + } + // Pick the file that has the largest ranges to read so far + let mut largest_group = heap.pop().unwrap(); + largest_group.new_groups.push(group_index); + heap.push(largest_group); + } + + // Distribute files to their newly assigned groups + while let Some(to_repartition) = heap.pop() { + let range_size = to_repartition.range_size() as i64; + let ToRepartition { + source_index, + file_size, + new_groups, + } = to_repartition; + assert_eq!(file_groups[source_index].len(), 1); + let original_file = file_groups[source_index].pop().unwrap(); + + let last_group = new_groups.len() - 1; + let mut range_start: i64 = 0; + let mut range_end: i64 = range_size; + for (i, group_index) in new_groups.into_iter().enumerate() { + let target_group = &mut file_groups[group_index]; + assert!(target_group.is_empty()); + + // adjust last range to include the entire file + if i == last_group { + range_end = file_size as i64; + } + target_group + .push(original_file.clone().with_range(range_start, range_end)); + range_start = range_end; + range_end += range_size; + } + } + + Some(file_groups) + } +} + +/// Tracks how a individual file will be repartitioned +#[derive(Debug, Clone, PartialEq, Eq)] +struct ToRepartition { + /// the index from which the original file will be taken + source_index: usize, + /// the size of the original file + file_size: usize, + /// indexes of which group(s) will this be distributed to (including `source_index`) + new_groups: Vec, +} + +impl ToRepartition { + // how big will each file range be when this file is read in its new groups? + fn range_size(&self) -> usize { + self.file_size / self.new_groups.len() + } +} + +impl PartialOrd for ToRepartition { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Order based on individual range +impl Ord for ToRepartition { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.range_size().cmp(&other.range_size()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + /// Empty file won't get partitioned + #[test] + fn repartition_empty_file_only() { + let partitioned_file_empty = pfile("empty", 0); + let file_group = vec![vec![partitioned_file_empty.clone()]]; + + let partitioned_files = FileGroupPartitioner::new() + .with_target_partitions(4) + .with_repartition_file_min_size(0) + .repartition_file_groups(&file_group); + + assert_partitioned_files(None, partitioned_files); + } + + /// Repartition when there is a empty file in file groups + #[test] + fn repartition_empty_files() { + let pfile_a = pfile("a", 10); + let pfile_b = pfile("b", 10); + let pfile_empty = pfile("empty", 0); + + let empty_first = vec![ + vec![pfile_empty.clone()], + vec![pfile_a.clone()], + vec![pfile_b.clone()], + ]; + let empty_middle = vec![ + vec![pfile_a.clone()], + vec![pfile_empty.clone()], + vec![pfile_b.clone()], + ]; + let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]]; + + // Repartition file groups into x partitions + let expected_2 = vec![ + vec![pfile("a", 10).with_range(0, 10)], + vec![pfile("b", 10).with_range(0, 10)], + ]; + let expected_3 = vec![ + vec![pfile("a", 10).with_range(0, 7)], + vec![ + pfile("a", 10).with_range(7, 10), + pfile("b", 10).with_range(0, 4), + ], + vec![pfile("b", 10).with_range(4, 10)], + ]; + + let file_groups_tests = [empty_first, empty_middle, empty_last]; + + for fg in file_groups_tests { + let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())]; + for (n_partition, expected) in all_expected { + let actual = FileGroupPartitioner::new() + .with_target_partitions(n_partition) + .with_repartition_file_min_size(10) + .repartition_file_groups(&fg); + + assert_partitioned_files(Some(expected), actual); + } + } + } + + #[test] + fn repartition_single_file() { + // Single file, single partition into multiple partitions + let single_partition = vec![vec![pfile("a", 123)]]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(4) + .with_repartition_file_min_size(10) + .repartition_file_groups(&single_partition); + + let expected = Some(vec![ + vec![pfile("a", 123).with_range(0, 31)], + vec![pfile("a", 123).with_range(31, 62)], + vec![pfile("a", 123).with_range(62, 93)], + vec![pfile("a", 123).with_range(93, 123)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_too_much_partitions() { + // Single file, single partition into 96 partitions + let partitioned_file = pfile("a", 8); + let single_partition = vec![vec![partitioned_file]]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(96) + .with_repartition_file_min_size(5) + .repartition_file_groups(&single_partition); + + let expected = Some(vec![ + vec![pfile("a", 8).with_range(0, 1)], + vec![pfile("a", 8).with_range(1, 2)], + vec![pfile("a", 8).with_range(2, 3)], + vec![pfile("a", 8).with_range(3, 4)], + vec![pfile("a", 8).with_range(4, 5)], + vec![pfile("a", 8).with_range(5, 6)], + vec![pfile("a", 8).with_range(6, 7)], + vec![pfile("a", 8).with_range(7, 8)], + ]); + + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_multiple_partitions() { + // Multiple files in single partition after redistribution + let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(3) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + vec![pfile("a", 40).with_range(0, 34)], + vec![ + pfile("a", 40).with_range(34, 40), + pfile("b", 60).with_range(0, 28), + ], + vec![pfile("b", 60).with_range(28, 60)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_same_num_partitions() { + // "Rebalance" files across partitions + let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(2) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + vec![ + pfile("a", 40).with_range(0, 40), + pfile("b", 60).with_range(0, 10), + ], + vec![pfile("b", 60).with_range(10, 60)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_no_action_ranges() { + // No action due to Some(range) in second file + let source_partitions = vec![ + vec![pfile("a", 123)], + vec![pfile("b", 144).with_range(1, 50)], + ]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(65) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + assert_partitioned_files(None, actual) + } + + #[test] + fn repartition_no_action_min_size() { + // No action due to target_partition_size + let single_partition = vec![vec![pfile("a", 123)]]; + + let actual = FileGroupPartitioner::new() + .with_target_partitions(65) + .with_repartition_file_min_size(500) + .repartition_file_groups(&single_partition); + + assert_partitioned_files(None, actual) + } + + #[test] + fn repartition_no_action_zero_files() { + // No action due to no files + let empty_partition = vec![]; + + let partitioner = FileGroupPartitioner::new() + .with_target_partitions(65) + .with_repartition_file_min_size(500); + + assert_partitioned_files(None, repartition_test(partitioner, empty_partition)) + } + + #[test] + fn repartition_ordered_no_action_too_few_partitions() { + // No action as there are no new groups to redistribute to + let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 200)]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(2) + .with_repartition_file_min_size(10) + .repartition_file_groups(&input_partitions); + + assert_partitioned_files(None, actual) + } + + #[test] + fn repartition_ordered_no_action_file_too_small() { + // No action as there are no new groups to redistribute to + let single_partition = vec![vec![pfile("a", 100)]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(2) + // file is too small to repartition + .with_repartition_file_min_size(1000) + .repartition_file_groups(&single_partition); + + assert_partitioned_files(None, actual) + } + + #[test] + fn repartition_ordered_one_large_file() { + // "Rebalance" the single large file across partitions + let source_partitions = vec![vec![pfile("a", 100)]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(3) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + vec![pfile("a", 100).with_range(0, 34)], + vec![pfile("a", 100).with_range(34, 68)], + vec![pfile("a", 100).with_range(68, 100)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_ordered_one_large_one_small_file() { + // "Rebalance" the single large file across empty partitions, but can't split + // small file + let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 30)]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(4) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + // scan first third of "a" + vec![pfile("a", 100).with_range(0, 33)], + // only b in this group (can't do this) + vec![pfile("b", 30).with_range(0, 30)], + // second third of "a" + vec![pfile("a", 100).with_range(33, 66)], + // final third of "a" + vec![pfile("a", 100).with_range(66, 100)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_ordered_two_large_files() { + // "Rebalance" two large files across empty partitions, but can't mix them + let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 100)]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(4) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + // scan first half of "a" + vec![pfile("a", 100).with_range(0, 50)], + // scan first half of "b" + vec![pfile("b", 100).with_range(0, 50)], + // second half of "a" + vec![pfile("a", 100).with_range(50, 100)], + // second half of "b" + vec![pfile("b", 100).with_range(50, 100)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_ordered_two_large_one_small_files() { + // "Rebalance" two large files and one small file across empty partitions + let source_partitions = vec![ + vec![pfile("a", 100)], + vec![pfile("b", 100)], + vec![pfile("c", 30)], + ]; + + let partitioner = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_repartition_file_min_size(10); + + // with 4 partitions, can only split the first large file "a" + let actual = partitioner + .with_target_partitions(4) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + // scan first half of "a" + vec![pfile("a", 100).with_range(0, 50)], + // All of "b" + vec![pfile("b", 100).with_range(0, 100)], + // All of "c" + vec![pfile("c", 30).with_range(0, 30)], + // second half of "a" + vec![pfile("a", 100).with_range(50, 100)], + ]); + assert_partitioned_files(expected, actual); + + // With 5 partitions, we can split both "a" and "b", but they can't be intermixed + let actual = partitioner + .with_target_partitions(5) + .repartition_file_groups(&source_partitions); + + let expected = Some(vec![ + // scan first half of "a" + vec![pfile("a", 100).with_range(0, 50)], + // scan first half of "b" + vec![pfile("b", 100).with_range(0, 50)], + // All of "c" + vec![pfile("c", 30).with_range(0, 30)], + // second half of "a" + vec![pfile("a", 100).with_range(50, 100)], + // second half of "b" + vec![pfile("b", 100).with_range(50, 100)], + ]); + assert_partitioned_files(expected, actual); + } + + #[test] + fn repartition_ordered_one_large_one_small_existing_empty() { + // "Rebalance" files using existing empty partition + let source_partitions = + vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(5) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + // Of the three available groups (2 original empty and 1 new from the + // target partitions), assign two to "a" and one to "b" + let expected = Some(vec![ + // Scan of "a" across three groups + vec![pfile("a", 100).with_range(0, 33)], + vec![pfile("a", 100).with_range(33, 66)], + // scan first half of "b" + vec![pfile("b", 40).with_range(0, 20)], + // final third of "a" + vec![pfile("a", 100).with_range(66, 100)], + // second half of "b" + vec![pfile("b", 40).with_range(20, 40)], + ]); + assert_partitioned_files(expected, actual); + } + #[test] + fn repartition_ordered_existing_group_multiple_files() { + // groups with multiple files in a group can not be changed, but can divide others + let source_partitions = vec![ + // two files in an existing partition + vec![pfile("a", 100), pfile("b", 100)], + vec![pfile("c", 40)], + ]; + + let actual = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(3) + .with_repartition_file_min_size(10) + .repartition_file_groups(&source_partitions); + + // Of the three available groups (2 original empty and 1 new from the + // target partitions), assign two to "a" and one to "b" + let expected = Some(vec![ + // don't try and rearrange files in the existing partition + // assuming that the caller had a good reason to put them that way. + // (it is technically possible to split off ranges from the files if desired) + vec![pfile("a", 100), pfile("b", 100)], + // first half of "c" + vec![pfile("c", 40).with_range(0, 20)], + // second half of "c" + vec![pfile("c", 40).with_range(20, 40)], + ]); + assert_partitioned_files(expected, actual); + } + + /// Asserts that the two groups of `ParititonedFile` are the same + /// (PartitionedFile doesn't implement PartialEq) + fn assert_partitioned_files( + expected: Option>>, + actual: Option>>, + ) { + match (expected, actual) { + (None, None) => {} + (Some(_), None) => panic!("Expected Some, got None"), + (None, Some(_)) => panic!("Expected None, got Some"), + (Some(expected), Some(actual)) => { + let expected_string = format!("{:#?}", expected); + let actual_string = format!("{:#?}", actual); + assert_eq!(expected_string, actual_string); + } + } + } + + /// returns a partitioned file with the specified path and size + fn pfile(path: impl Into, file_size: u64) -> PartitionedFile { + PartitionedFile::new(path, file_size) + } + + /// repartition the file groups both with and without preserving order + /// asserting they return the same value and returns that value + fn repartition_test( + partitioner: FileGroupPartitioner, + file_groups: Vec>, + ) -> Option>> { + let repartitioned = partitioner.repartition_file_groups(&file_groups); + + let repartitioned_preserving_sort = partitioner + .with_preserve_order_within_groups(true) + .repartition_file_groups(&file_groups); + + assert_partitioned_files( + repartitioned.clone(), + repartitioned_preserving_sort.clone(), + ); + repartitioned + } +} diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index d308397ab6e2..89694ff28500 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -19,15 +19,11 @@ //! file sources. use std::{ - borrow::Cow, cmp::min, collections::HashMap, fmt::Debug, marker::PhantomData, - sync::Arc, vec, + borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc, vec, }; -use super::get_projected_output_ordering; -use crate::datasource::{ - listing::{FileRange, PartitionedFile}, - object_store::ObjectStoreUrl, -}; +use super::{get_projected_output_ordering, FileGroupPartitioner}; +use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -42,7 +38,6 @@ use datafusion_common::stats::Precision; use datafusion_common::{exec_err, ColumnStatistics, Statistics}; use datafusion_physical_expr::LexOrdering; -use itertools::Itertools; use log::warn; /// Convert type to a type suitable for use as a [`ListingTable`] @@ -176,79 +171,17 @@ impl FileScanConfig { }) } - /// Repartition all input files into `target_partitions` partitions, if total file size exceed - /// `repartition_file_min_size` - /// `target_partitions` and `repartition_file_min_size` directly come from configuration. - /// - /// This function only try to partition file byte range evenly, and let specific `FileOpener` to - /// do actual partition on specific data source type. (e.g. `CsvOpener` will only read lines - /// overlap with byte range but also handle boundaries to ensure all lines will be read exactly once) + #[allow(missing_docs)] + #[deprecated(since = "33.0.0", note = "Use SessionContext::new_with_config")] pub fn repartition_file_groups( file_groups: Vec>, target_partitions: usize, repartition_file_min_size: usize, ) -> Option>> { - let flattened_files = file_groups.iter().flatten().collect::>(); - - // Perform redistribution only in case all files should be read from beginning to end - let has_ranges = flattened_files.iter().any(|f| f.range.is_some()); - if has_ranges { - return None; - } - - let total_size = flattened_files - .iter() - .map(|f| f.object_meta.size as i64) - .sum::(); - if total_size < (repartition_file_min_size as i64) || total_size == 0 { - return None; - } - - let target_partition_size = - (total_size as usize + (target_partitions) - 1) / (target_partitions); - - let current_partition_index: usize = 0; - let current_partition_size: usize = 0; - - // Partition byte range evenly for all `PartitionedFile`s - let repartitioned_files = flattened_files - .into_iter() - .scan( - (current_partition_index, current_partition_size), - |state, source_file| { - let mut produced_files = vec![]; - let mut range_start = 0; - while range_start < source_file.object_meta.size { - let range_end = min( - range_start + (target_partition_size - state.1), - source_file.object_meta.size, - ); - - let mut produced_file = source_file.clone(); - produced_file.range = Some(FileRange { - start: range_start as i64, - end: range_end as i64, - }); - produced_files.push((state.0, produced_file)); - - if state.1 + (range_end - range_start) >= target_partition_size { - state.0 += 1; - state.1 = 0; - } else { - state.1 += range_end - range_start; - } - range_start = range_end; - } - Some(produced_files) - }, - ) - .flatten() - .group_by(|(partition_idx, _)| *partition_idx) - .into_iter() - .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec()) - .collect_vec(); - - Some(repartitioned_files) + FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .repartition_file_groups(&file_groups) } } diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 14e550eab1d5..8e4dd5400b20 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -20,11 +20,13 @@ mod arrow_file; mod avro; mod csv; +mod file_groups; mod file_scan_config; mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; +pub use file_groups::FileGroupPartitioner; pub(crate) use self::csv::plan_to_csv; pub use self::csv::{CsvConfig, CsvExec, CsvOpener}; @@ -537,7 +539,6 @@ mod tests { }; use arrow_schema::Field; use chrono::Utc; - use datafusion_common::config::ConfigOptions; use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; @@ -809,345 +810,4 @@ mod tests { extensions: None, } } - - /// Unit tests for `repartition_file_groups()` - #[cfg(feature = "parquet")] - mod repartition_file_groups_test { - use datafusion_common::Statistics; - use itertools::Itertools; - - use super::*; - - /// Empty file won't get partitioned - #[tokio::test] - async fn repartition_empty_file_only() { - let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); - let file_group = vec![vec![partitioned_file_empty]]; - - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: file_group, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let partitioned_file = repartition_with_size(&parquet_exec, 4, 0); - - assert!(partitioned_file[0][0].range.is_none()); - } - - // Repartition when there is a empty file in file groups - #[tokio::test] - async fn repartition_empty_files() { - let partitioned_file_a = PartitionedFile::new("a".to_string(), 10); - let partitioned_file_b = PartitionedFile::new("b".to_string(), 10); - let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); - - let empty_first = vec![ - vec![partitioned_file_empty.clone()], - vec![partitioned_file_a.clone()], - vec![partitioned_file_b.clone()], - ]; - let empty_middle = vec![ - vec![partitioned_file_a.clone()], - vec![partitioned_file_empty.clone()], - vec![partitioned_file_b.clone()], - ]; - let empty_last = vec![ - vec![partitioned_file_a], - vec![partitioned_file_b], - vec![partitioned_file_empty], - ]; - - // Repartition file groups into x partitions - let expected_2 = - vec![(0, "a".to_string(), 0, 10), (1, "b".to_string(), 0, 10)]; - let expected_3 = vec![ - (0, "a".to_string(), 0, 7), - (1, "a".to_string(), 7, 10), - (1, "b".to_string(), 0, 4), - (2, "b".to_string(), 4, 10), - ]; - - //let file_groups_testset = [empty_first, empty_middle, empty_last]; - let file_groups_testset = [empty_first, empty_middle, empty_last]; - - for fg in file_groups_testset { - for (n_partition, expected) in [(2, &expected_2), (3, &expected_3)] { - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: fg.clone(), - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Arc::new( - Schema::empty(), - )), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = - repartition_with_size_to_vec(&parquet_exec, n_partition, 10); - - assert_eq!(expected, &actual); - } - } - } - - #[tokio::test] - async fn repartition_single_file() { - // Single file, single partition into multiple partitions - let partitioned_file = PartitionedFile::new("a".to_string(), 123); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size_to_vec(&parquet_exec, 4, 10); - let expected = vec![ - (0, "a".to_string(), 0, 31), - (1, "a".to_string(), 31, 62), - (2, "a".to_string(), 62, 93), - (3, "a".to_string(), 93, 123), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn repartition_too_much_partitions() { - // Single file, single parittion into 96 partitions - let partitioned_file = PartitionedFile::new("a".to_string(), 8); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size_to_vec(&parquet_exec, 96, 5); - let expected = vec![ - (0, "a".to_string(), 0, 1), - (1, "a".to_string(), 1, 2), - (2, "a".to_string(), 2, 3), - (3, "a".to_string(), 3, 4), - (4, "a".to_string(), 4, 5), - (5, "a".to_string(), 5, 6), - (6, "a".to_string(), 6, 7), - (7, "a".to_string(), 7, 8), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn repartition_multiple_partitions() { - // Multiple files in single partition after redistribution - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); - let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); - let source_partitions = - vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size_to_vec(&parquet_exec, 3, 10); - let expected = vec![ - (0, "a".to_string(), 0, 34), - (1, "a".to_string(), 34, 40), - (1, "b".to_string(), 0, 28), - (2, "b".to_string(), 28, 60), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn repartition_same_num_partitions() { - // "Rebalance" files across partitions - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); - let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); - let source_partitions = - vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size_to_vec(&parquet_exec, 2, 10); - let expected = vec![ - (0, "a".to_string(), 0, 40), - (0, "b".to_string(), 0, 10), - (1, "b".to_string(), 10, 60), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn repartition_no_action_ranges() { - // No action due to Some(range) in second file - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123); - let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144); - partitioned_file_2.range = Some(FileRange { start: 1, end: 50 }); - - let source_partitions = - vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size(&parquet_exec, 65, 10); - assert_eq!(2, actual.len()); - } - - #[tokio::test] - async fn repartition_no_action_min_size() { - // No action due to target_partition_size - let partitioned_file = PartitionedFile::new("a".to_string(), 123); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = repartition_with_size(&parquet_exec, 65, 500); - assert_eq!(1, actual.len()); - } - - /// Calls `ParquetExec.repartitioned` with the specified - /// `target_partitions` and `repartition_file_min_size`, returning the - /// resulting `PartitionedFile`s - fn repartition_with_size( - parquet_exec: &ParquetExec, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Vec> { - let mut config = ConfigOptions::new(); - config.optimizer.repartition_file_min_size = repartition_file_min_size; - - parquet_exec - .repartitioned(target_partitions, &config) - .unwrap() // unwrap Result - .unwrap() // unwrap Option - .as_any() - .downcast_ref::() - .unwrap() - .base_config() - .file_groups - .clone() - } - - /// Calls `repartition_with_size` and returns a tuple for each output `PartitionedFile`: - /// - /// `(partition index, file path, start, end)` - fn repartition_with_size_to_vec( - parquet_exec: &ParquetExec, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Vec<(usize, String, i64, i64)> { - let file_groups = repartition_with_size( - parquet_exec, - target_partitions, - repartition_file_min_size, - ); - - file_groups - .iter() - .enumerate() - .flat_map(|(part_idx, files)| { - files - .iter() - .map(|f| { - ( - part_idx, - f.object_meta.location.to_string(), - f.range.as_ref().unwrap().start, - f.range.as_ref().unwrap().end, - ) - }) - .collect_vec() - }) - .collect_vec() - } - } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 847ea6505632..2b10b05a273a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -26,8 +26,8 @@ use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, DisplayAs, FileMeta, FileScanConfig, - SchemaAdapter, + parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, + FileMeta, FileScanConfig, SchemaAdapter, }; use crate::{ config::ConfigOptions, @@ -330,18 +330,18 @@ impl ExecutionPlan for ParquetExec { } /// Redistribute files across partitions according to their size - /// See comments on `get_file_groups_repartitioned()` for more detail. + /// See comments on [`FileGroupPartitioner`] for more detail. fn repartitioned( &self, target_partitions: usize, config: &ConfigOptions, ) -> Result>> { let repartition_file_min_size = config.optimizer.repartition_file_min_size; - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(self.output_ordering().is_some()) + .repartition_file_groups(&self.base_config.file_groups); let mut new_plan = self.clone(); if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index f2e04989ef66..099759741a10 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1761,6 +1761,7 @@ pub(crate) mod tests { parquet_exec_with_sort(vec![]) } + /// create a single parquet file that is sorted pub(crate) fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { @@ -1785,7 +1786,7 @@ pub(crate) mod tests { parquet_exec_multiple_sorted(vec![]) } - // Created a sorted parquet exec with multiple files + /// Created a sorted parquet exec with multiple files fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { @@ -3858,6 +3859,56 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn parallelization_multiple_files() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]; + + let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key])); + let plan = sort_required_exec(plan); + + // The groups must have only contiguous ranges of rows from the same file + // if any group has rows from multiple files, the data is no longer sorted destroyed + // https://github.com/apache/arrow-datafusion/issues/8451 + let expected = [ + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; + let target_partitions = 3; + let repartition_size = 1; + assert_optimized!( + expected, + plan, + true, + true, + target_partitions, + true, + repartition_size + ); + + let expected = [ + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + let target_partitions = 8; + let repartition_size = 1; + assert_optimized!( + expected, + plan, + true, + true, + target_partitions, + true, + repartition_size + ); + + Ok(()) + } + #[test] /// CsvExec on compressed csv file will not be partitioned /// (Not able to decompress chunked csv file) @@ -4529,15 +4580,11 @@ pub(crate) mod tests { assert_plan_txt!(expected, physical_plan); let expected = &[ - "SortRequiredExec: [a@0 ASC]", // Since at the start of the rule ordering requirement is satisfied // EnforceDistribution rule satisfy this requirement also. - // ordering is re-satisfied by introduction of SortExec. - "SortExec: expr=[a@0 ASC]", + "SortRequiredExec: [a@0 ASC]", "FilterExec: c@2 = 0", - // ordering is lost here - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + "ParquetExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; let mut config = ConfigOptions::new(); diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 551d6d9ed48a..5dcdbb504e76 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -118,7 +118,7 @@ physical_plan SortPreservingMergeExec: [column1@0 ASC NULLS LAST] --CoalesceBatchesExec: target_batch_size=8192 ----FilterExec: column1@0 != 42 -------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 # Cleanup statement ok