From 4cd24fce423112502d8e33a4e3585519452b1019 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 23 Sep 2022 00:33:39 +0300 Subject: [PATCH] Prevent memory overflows (and spills) on sorts with a fixed limit --- datafusion/core/src/physical_plan/metrics/value.rs | 7 +++++++ datafusion/core/src/physical_plan/sorts/sort.rs | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/datafusion/core/src/physical_plan/metrics/value.rs b/datafusion/core/src/physical_plan/metrics/value.rs index 5360f272c314f..5c3aeb4dcdca2 100644 --- a/datafusion/core/src/physical_plan/metrics/value.rs +++ b/datafusion/core/src/physical_plan/metrics/value.rs @@ -122,6 +122,13 @@ impl Gauge { self.value.fetch_add(n, Ordering::Relaxed); } + /// Sub `n` from the metric's value + pub fn sub(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.value.fetch_sub(n, Ordering::Relaxed); + } + /// Set the metric's value to `n` and return the previous value pub fn set(&self, n: usize) -> usize { // relaxed ordering for operations on `value` poses no issues diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index fb2ad091900d3..bb307d0f78aec 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -124,6 +124,19 @@ impl ExternalSorter { // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?; + // The resulting batch might be smaller than the input batch if there + // is an propagated limit. + if self.fetch.is_some() { + let size_delta = size.checked_sub(batch_byte_size(&partial.sorted_batch)).unwrap_or_else(|| { + panic!( + "The size of the sorted batch is larger than the size of the input batch: {} > {}", + size, + batch_byte_size(&partial.sorted_batch) + ) + }); + self.shrink(size_delta); + self.metrics.mem_used().sub(size_delta); + } in_mem_batches.push(partial); } Ok(())