Skip to content

Commit

Permalink
GroupedHashAggregateStream breaks spill batch
Browse files Browse the repository at this point in the history
... into smaller chunks to decrease memory required for merging.
  • Loading branch information
milenkovicm committed Oct 31, 2023
1 parent d8e413c commit b814058
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,18 @@ impl GroupedHashAggregateStream {
let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
writer.write(&sorted)?;
let mut offset = 0;
let total_rows = sorted.num_rows();

while offset < total_rows {
// TODO: we could consider smaller batch size as there may be hundreds of batches
// loaded at the same time.
let length = std::cmp::min(total_rows - offset, self.batch_size);
let batch = sorted.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}

writer.finish()?;
self.spill_state.spills.push(spillfile);
Ok(())
Expand Down

0 comments on commit b814058

Please sign in to comment.