Skip to content

Commit

Permalink
Prevent memory overflows (and spills) on sorts with a fixed limit
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Sep 22, 2022
1 parent add10a6 commit 12ee54a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
7 changes: 7 additions & 0 deletions datafusion/core/src/physical_plan/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ impl Gauge {
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Sub `n` from the metric's value
pub fn sub(&self, n: usize) {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.fetch_sub(n, Ordering::Relaxed);
}

/// Set the metric's value to `n` and return the previous value
pub fn set(&self, n: usize) -> usize {
// relaxed ordering for operations on `value` poses no issues
Expand Down
14 changes: 14 additions & 0 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ impl ExternalSorter {
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?;
// The resulting batch might be smaller than the input batch if there
// is an propagated limit.
if self.fetch.is_some() {
let new_size = batch_byte_size(&partial.sorted_batch);
let size_delta = size.checked_sub(new_size).unwrap_or_else(|| {
panic!(
"The size of the sorted batch is larger than the size of the input batch: {} > {}",
size,
new_size
)
});
self.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
in_mem_batches.push(partial);
}
Ok(())
Expand Down

0 comments on commit 12ee54a

Please sign in to comment.