Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jul 17, 2024
1 parent 186b2ff commit 1d9c7d4
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ use crate::joins::utils::{
symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef,
};
use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::spill::spill_record_batches;
use crate::{
execution_mode_from_children, metrics, spill_record_batches, DisplayAs,
DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
Statistics,
execution_mode_from_children, metrics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};

/// join execution plan executes partitions in parallel and combines them into a set of
Expand Down
101 changes: 101 additions & 0 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,104 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
}
Ok(())
}

/// Spill the `RecordBatch` to disk as smaller batches
/// split by `batch_size_rows`
/// Return `total_rows` what is spilled
pub fn spill_record_batch_by_size(
batch: &RecordBatch,
path: PathBuf,
schema: SchemaRef,
batch_size_rows: usize,
) -> Result<usize> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;

while offset < total_rows {
let length = std::cmp::min(total_rows - offset, batch_size_rows);
let batch = batch.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}
writer.finish()?;

Ok(total_rows)
}

#[cfg(test)]
mod tests {
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
use crate::test::build_table_i32;
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

#[test]
fn test_batch_spill_and_read() -> Result<()> {
let batch1 = build_table_i32(
("a2", &vec![0, 1, 2]),
("b2", &vec![3, 4, 5]),
("c2", &vec![4, 5, 6]),
);

let batch2 = build_table_i32(
("a2", &vec![10, 11, 12]),
("b2", &vec![13, 14, 15]),
("c2", &vec![14, 15, 16]),
);

let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let schema = batch1.schema();
let num_rows = batch1.num_rows() + batch2.num_rows();
let cnt = spill_record_batches(
vec![batch1, batch2],
spill_file.path().into(),
Arc::clone(&schema),
);
assert_eq!(cnt.unwrap(), num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);

Ok(())
}

#[test]
fn test_batch_spill_by_size() -> Result<()> {
let batch1 = build_table_i32(
("a2", &vec![0, 1, 2, 3]),
("b2", &vec![3, 4, 5, 6]),
("c2", &vec![4, 5, 6, 7]),
);

let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let schema = batch1.schema();
let num_rows = batch1.num_rows();
let cnt = spill_record_batch_by_size(
&batch1,
spill_file.path().into(),
Arc::clone(&schema),
1,
);
assert_eq!(cnt.unwrap(), num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

Ok(())
}
}

0 comments on commit 1d9c7d4

Please sign in to comment.