diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 5f644b658bf77..00537c875b587 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -25,7 +25,7 @@ use crate::execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, }; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; +use crate::physical_plan::common::{batch_byte_size, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, @@ -34,28 +34,25 @@ use crate::physical_plan::sorts::merge::streaming_merge; use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::physical_plan::{ DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, Statistics, + SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::{make_array, Array, ArrayRef, MutableArrayData}; +use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; -use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; +use arrow::compute::{concat_batches, lexsort_to_indices, take}; use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use datafusion_physical_expr::EquivalenceProperties; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; -use std::cmp::{min, Ordering}; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::task::{Context, Poll}; use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; @@ -71,10 +68,11 @@ use tokio::task; /// 3. when input is exhausted, merge all in memory batches and spills to get a total order. struct ExternalSorter { schema: SchemaRef, - in_mem_batches: Vec, + in_mem_batches: Vec, + in_mem_batches_sorted: bool, spills: Vec, /// Sort expressions - expr: Vec, + expr: Arc<[PhysicalSortExpr]>, session_config: Arc, runtime: Arc, metrics_set: CompositeMetricsSet, @@ -103,8 +101,9 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], + in_mem_batches_sorted: true, spills: vec![], - expr, + expr: expr.into(), session_config, runtime, metrics_set, @@ -115,47 +114,37 @@ impl ExternalSorter { } } - async fn insert_batch( - &mut self, - input: RecordBatch, - tracking_metrics: &MemTrackingMetrics, - ) -> Result<()> { - if input.num_rows() > 0 { - let size = batch_byte_size(&input); - if self.reservation.try_grow(size).is_err() { + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` + /// + /// Updates memory usage metrics, and possibly triggers spilling to disk + async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { + if input.num_rows() == 0 { + return Ok(()); + } + + let size = batch_byte_size(&input); + if self.reservation.try_grow(size).is_err() { + let before = self.reservation.size(); + self.in_mem_sort().await?; + // Sorting may have freed memory, especially if fetch is not `None` + // + // As such we check again, and if the memory usage has dropped by + // a factor of 2, and we can allocate the necessary capacity, + // we don't spill + // + // The factor of 2 aims to avoid a degenerate case where the + // memory required for `fetch` is just under the memory available, + // causing repeated resorting of data + if self.reservation.size() > before / 2 + || self.reservation.try_grow(size).is_err() + { self.spill().await?; self.reservation.try_grow(size)? } - - self.metrics.mem_used().add(size); - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); - let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?; - - // The resulting batch might be smaller (or larger, see #3747) than the input - // batch due to either a propagated limit or the re-construction of arrays. So - // for being reliable, we need to reflect the memory usage of the partial batch. - let new_size = batch_byte_size(&partial.sorted_batch); - match new_size.cmp(&size) { - Ordering::Greater => { - // We don't have to call try_grow here, since we have already used the - // memory (so spilling right here wouldn't help at all for the current - // operation). But we still have to record it so that other requesters - // would know about this unexpected increase in memory consumption. - let new_size_delta = new_size - size; - self.reservation.grow(new_size_delta); - self.metrics.mem_used().add(new_size_delta); - } - Ordering::Less => { - let size_delta = size - new_size; - self.reservation.shrink(size_delta); - self.metrics.mem_used().sub(size_delta); - } - Ordering::Equal => {} - } - self.in_mem_batches.push(partial); } + self.metrics.mem_used().add(size); + self.in_mem_batches.push(input); + self.in_mem_batches_sorted = false; Ok(()) } @@ -165,28 +154,18 @@ impl ExternalSorter { /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. fn sort(&mut self) -> Result { - let batch_size = self.session_config.batch_size(); - if self.spilled_before() { let intermediate_metrics = self .metrics_set .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool); - let mut merge_metrics = self + + let merge_metrics = self .metrics_set .new_final_tracking(self.partition_id, &self.runtime.memory_pool); let mut streams = vec![]; if !self.in_mem_batches.is_empty() { - let in_mem_stream = in_mem_partial_sort( - &mut self.in_mem_batches, - self.schema.clone(), - &self.expr, - batch_size, - intermediate_metrics, - self.fetch, - )?; - // TODO: More accurate, dynamic memory accounting (#5885) - merge_metrics.init_mem_used(self.reservation.free()); + let in_mem_stream = self.in_mem_sort_stream(intermediate_metrics)?; streams.push(in_mem_stream); } @@ -206,14 +185,7 @@ impl ExternalSorter { let tracking_metrics = self .metrics_set .new_final_tracking(self.partition_id, &self.runtime.memory_pool); - let result = in_mem_partial_sort( - &mut self.in_mem_batches, - self.schema.clone(), - &self.expr, - batch_size, - tracking_metrics, - self.fetch, - ); + let result = self.in_mem_sort_stream(tracking_metrics); // Report to the memory manager we are no longer using memory self.reservation.free(); result @@ -241,331 +213,151 @@ impl ExternalSorter { } debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); + assert!(self.in_mem_batches_sorted); - let tracking_metrics = self - .metrics_set - .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool); + self.in_mem_sort().await?; let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; - let stream = in_mem_partial_sort( - &mut self.in_mem_batches, - self.schema.clone(), - &self.expr, - self.session_config.batch_size(), - tracking_metrics, - self.fetch, - ); - - spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) - .await?; + let batches = std::mem::take(&mut self.in_mem_batches); + spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; self.reservation.free(); let used = self.metrics.mem_used().set(0); self.metrics.record_spill(used); self.spills.push(spillfile); Ok(used) } -} - -impl Debug for ExternalSorter { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spill_count", &self.spill_count()) - .finish() - } -} -/// consume the non-empty `sorted_batches` and do in_mem_sort -fn in_mem_partial_sort( - buffered_batches: &mut Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - batch_size: usize, - tracking_metrics: MemTrackingMetrics, - fetch: Option, -) -> Result { - assert_ne!(buffered_batches.len(), 0); - if buffered_batches.len() == 1 { - let result = buffered_batches.pop(); - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(result.unwrap().sorted_batch)], - tracking_metrics, - ))) - } else { - let (sorted_arrays, batches): (Vec>, Vec) = - buffered_batches - .drain(..) - .map(|b| { - let BatchWithSortArray { - sort_arrays, - sorted_batch: batch, - } = b; - (sort_arrays, batch) - }) - .unzip(); - - let sorted_iter = { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)? - }; - Ok(Box::pin(SortedSizedRecordBatchStream::new( - schema, - batches, - sorted_iter, - tracking_metrics, - ))) - } -} - -#[derive(Debug, Copy, Clone)] -struct CompositeIndex { - batch_idx: u32, - row_idx: u32, -} - -/// Get sorted iterator by sort concatenated `SortColumn`s -fn get_sorted_iter( - sort_arrays: &[Vec], - expr: &[PhysicalSortExpr], - batch_size: usize, - fetch: Option, -) -> Result { - let row_indices = sort_arrays - .iter() - .enumerate() - .flat_map(|(i, arrays)| { - (0..arrays[0].len()).map(move |r| CompositeIndex { - // since we original use UInt32Array to index the combined mono batch, - // component record batches won't overflow as well, - // use u32 here for space efficiency. - batch_idx: i as u32, - row_idx: r as u32, - }) - }) - .collect::>(); - - let sort_columns = expr - .iter() - .enumerate() - .map(|(i, expr)| { - let columns_i = sort_arrays - .iter() - .map(|cs| cs[i].as_ref()) - .collect::>(); - Ok(SortColumn { - values: concat(columns_i.as_slice())?, - options: Some(expr.options), - }) - }) - .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, fetch)?; - - // Calculate composite index based on sorted indices - let row_indices = indices - .values() - .iter() - .map(|i| row_indices[*i as usize]) - .collect(); + /// Sorts the in_mem_batches in place + async fn in_mem_sort(&mut self) -> Result<()> { + if self.in_mem_batches_sorted { + return Ok(()); + } - Ok(SortedIterator::new(row_indices, batch_size)) -} + let tracking_metrics = self + .metrics_set + .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool); -struct SortedIterator { - /// Current logical position in the iterator - pos: usize, - /// Sorted composite index of where to find the rows in buffered batches - composite: Vec, - /// Maximum batch size to produce - batch_size: usize, -} + self.in_mem_batches = self + .in_mem_sort_stream(tracking_metrics)? + .try_collect() + .await?; -impl SortedIterator { - fn new(composite: Vec, batch_size: usize) -> Self { - Self { - pos: 0, - composite, - batch_size, - } - } + let size: usize = self + .in_mem_batches + .iter() + .map(|x| x.get_array_memory_size()) + .sum(); - fn memory_size(&self) -> usize { - std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..]) + self.metrics.mem_used().set(size); + self.reservation.resize(size); + self.in_mem_batches_sorted = true; + Ok(()) } -} -impl Iterator for SortedIterator { - type Item = Vec; - - /// Emit a max of `batch_size` positions each time - fn next(&mut self) -> Option { - let length = self.composite.len(); - if self.pos >= length { - return None; + /// Consumes in_mem_batches returning a sorted stream + fn in_mem_sort_stream( + &mut self, + metrics: MemTrackingMetrics, + ) -> Result { + assert_ne!(self.in_mem_batches.len(), 0); + if self.in_mem_batches.len() == 1 { + let batch = self.in_mem_batches.remove(0); + let stream = + sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?; + self.in_mem_batches.clear(); + return Ok(stream); } - let current_size = min(self.batch_size, length - self.pos); - // Combine adjacent indexes from the same batch to make a slice, - // for more efficient `extend` later. - let mut last_batch_idx = self.composite[self.pos].batch_idx; - let mut indices_in_batch = Vec::with_capacity(current_size); - - let mut slices = vec![]; - for ci in &self.composite[self.pos..self.pos + current_size] { - if ci.batch_idx != last_batch_idx { - group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); - last_batch_idx = ci.batch_idx; - } - indices_in_batch.push(ci.row_idx); + // If less than 1MB of in-memory data, concatenate and sort in place + // + // This is a very rough heuristic and likely could be refined further + if self.reservation.size() < 1048576 { + // Concatenate memory batches together and sort + let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + self.in_mem_batches.clear(); + return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics); } - assert!( - !indices_in_batch.is_empty(), - "There should have at least one record in a sort output slice." - ); - group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); + let streams = self + .in_mem_batches + .drain(..) + .map(|batch| { + let metrics = self.metrics_set.new_intermediate_tracking( + self.partition_id, + &self.runtime.memory_pool, + ); + sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics) + }) + .collect::>()?; - self.pos += current_size; - Some(slices) + // TODO: Run batch sorts concurrently (#6162) + // TODO: Pushdown fetch to streaming merge (#6000) + + streaming_merge( + streams, + self.schema.clone(), + &self.expr, + metrics, + self.session_config.batch_size(), + ) } } -/// Group continuous indices into a slice for better `extend` performance -fn group_indices( - batch_idx: u32, - positions: &mut Vec, - output: &mut Vec, -) { - positions.sort_unstable(); - let mut last_pos = 0; - let mut run_length = 0; - for pos in positions.iter() { - if run_length == 0 { - last_pos = *pos; - run_length = 1; - } else if *pos == last_pos + 1 { - run_length += 1; - last_pos = *pos; - } else { - output.push(CompositeSlice { - batch_idx, - start_row_idx: last_pos + 1 - run_length, - len: run_length as usize, - }); - last_pos = *pos; - run_length = 1; - } +impl Debug for ExternalSorter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ExternalSorter") + .field("memory_used", &self.used()) + .field("spilled_bytes", &self.spilled_bytes()) + .field("spill_count", &self.spill_count()) + .finish() } - assert!( - run_length > 0, - "There should have at least one record in a sort output slice." - ); - output.push(CompositeSlice { - batch_idx, - start_row_idx: last_pos + 1 - run_length, - len: run_length as usize, - }); - positions.clear() } -/// Stream of sorted record batches -struct SortedSizedRecordBatchStream { - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - num_cols: usize, - metrics: MemTrackingMetrics, +fn sort_batch_stream( + batch: RecordBatch, + expressions: Arc<[PhysicalSortExpr]>, + fetch: Option, + mut tracking_metrics: MemTrackingMetrics, +) -> Result { + let schema = batch.schema(); + tracking_metrics.init_mem_used(batch.get_array_memory_size()); + let stream = futures::stream::once(futures::future::lazy(move |_| { + let sorted = sort_batch(&batch, &expressions, fetch)?; + tracking_metrics.record_output(sorted.num_rows()); + Ok(sorted) + })); + return Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))); } -impl SortedSizedRecordBatchStream { - /// new - pub fn new( - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - mut metrics: MemTrackingMetrics, - ) -> Self { - let size = batches.iter().map(batch_byte_size).sum::() - + sorted_iter.memory_size(); - metrics.init_mem_used(size); - let num_cols = batches[0].num_columns(); - SortedSizedRecordBatchStream { - schema, - batches, - sorted_iter, - num_cols, - metrics, - } - } -} +fn sort_batch( + batch: &RecordBatch, + expressions: &[PhysicalSortExpr], + fetch: Option, +) -> Result { + let sort_columns = expressions + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) + .collect::>>()?; -impl Stream for SortedSizedRecordBatchStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - match self.sorted_iter.next() { - None => Poll::Ready(None), - Some(slices) => { - let num_rows = slices.iter().map(|s| s.len).sum(); - let output = (0..self.num_cols) - .map(|i| { - let arrays = self - .batches - .iter() - .map(|b| b.column(i).to_data()) - .collect::>(); - let arrays = arrays.iter().collect(); - let mut mutable = MutableArrayData::new(arrays, false, num_rows); - for x in slices.iter() { - mutable.extend( - x.batch_idx as usize, - x.start_row_idx as usize, - x.start_row_idx as usize + x.len, - ); - } - make_array(mutable.freeze()) - }) - .collect::>(); - let batch = - RecordBatch::try_new(self.schema.clone(), output).map_err(Into::into); - let poll = Poll::Ready(Some(batch)); - self.metrics.record_poll(poll) - } - } - } -} + let indices = lexsort_to_indices(&sort_columns, fetch)?; -struct CompositeSlice { - batch_idx: u32, - start_row_idx: u32, - len: usize, -} + let columns = batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &indices, None)) + .collect::>()?; -impl RecordBatchStream for SortedSizedRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } + Ok(RecordBatch::try_new(batch.schema(), columns)?) } -async fn spill_partial_sorted_stream( - in_mem_stream: &mut SendableRecordBatchStream, +async fn spill_sorted_batches( + batches: Vec, path: &Path, schema: SchemaRef, ) -> Result<()> { - let (sender, receiver) = tokio::sync::mpsc::channel(2); let path: PathBuf = path.into(); - let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema)); - while let Some(item) = in_mem_stream.next().await { - sender.send(item).await.ok(); - } - drop(sender); + let handle = task::spawn_blocking(move || write_sorted(batches, path, schema)); match handle.await { Ok(r) => r, Err(e) => Err(DataFusionError::Execution(format!( @@ -593,13 +385,13 @@ fn read_spill_as_stream( } fn write_sorted( - mut receiver: Receiver>, + batches: Vec, path: PathBuf, schema: SchemaRef, ) -> Result<()> { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(batch) = receiver.blocking_recv() { - writer.write(&batch?)?; + for batch in batches { + writer.write(&batch)?; } writer.finish()?; debug!( @@ -845,63 +637,6 @@ impl ExecutionPlan for SortExec { } } -struct BatchWithSortArray { - sort_arrays: Vec, - sorted_batch: RecordBatch, -} - -fn sort_batch( - batch: RecordBatch, - schema: SchemaRef, - expr: &[PhysicalSortExpr], - fetch: Option, -) -> Result { - let sort_columns = expr - .iter() - .map(|e| e.evaluate_to_sort_column(&batch)) - .collect::>>()?; - - let indices = lexsort_to_indices(&sort_columns, fetch)?; - - // reorder all rows based on sorted indices - let sorted_batch = RecordBatch::try_new( - schema, - batch - .columns() - .iter() - .map(|column| { - take( - column.as_ref(), - &indices, - // disable bound check overhead since indices are already generated from - // the same record batch - Some(TakeOptions { - check_bounds: false, - }), - ) - }) - .collect::, ArrowError>>()?, - )?; - - let sort_arrays = sort_columns - .into_iter() - .map(|sc| { - Ok(take( - sc.values.as_ref(), - &indices, - Some(TakeOptions { - check_bounds: false, - }), - )?) - }) - .collect::>>()?; - - Ok(BatchWithSortArray { - sort_arrays, - sorted_batch, - }) -} - async fn do_sort( mut input: SendableRecordBatchStream, partition_id: usize, @@ -917,8 +652,6 @@ async fn do_sort( context.task_id() ); let schema = input.schema(); - let tracking_metrics = - metrics_set.new_intermediate_tracking(partition_id, context.memory_pool()); let mut sorter = ExternalSorter::new( partition_id, schema.clone(), @@ -930,7 +663,7 @@ async fn do_sort( ); while let Some(batch) = input.next().await { let batch = batch?; - sorter.insert_batch(batch, &tracking_metrics).await?; + sorter.insert_batch(batch).await?; } let result = sorter.sort(); debug!( @@ -1089,7 +822,7 @@ mod tests { #[tokio::test] async fn test_sort_fetch_memory_calculation() -> Result<()> { // This test mirrors down the size from the example above. - let avg_batch_size = 5000; + let avg_batch_size = 4000; let partitions = 4; // A tuple of (fetch, expect_spillage)