Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Nov 15, 2024
1 parent 84cee68 commit 951a5f4
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ pub fn spill_record_batch_by_size(

/// Calculate total used memory of this batch.
///
/// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory
/// This function is used to estimate the physical memory usage of the `RecordBatch`.
/// It only counts the memory of large data `Buffer`s, and ignores metadata like
/// types and pointers.
/// The implementation will add up all unique `Buffer`'s memory
/// size, due to:
/// - The data pointer inside `Buffer` are memory regions returned by global memory
/// allocator, those regions can't have overlap.
Expand Down Expand Up @@ -277,6 +280,27 @@ mod tests {
assert_eq!(size, 60);
}

#[test]
fn test_get_record_batch_memory_size_with_null() {
// Create a simple record batch with two columns
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));

let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 100);
}

#[test]
fn test_get_record_batch_memory_size_empty() {
// Test with empty record batch
Expand All @@ -296,22 +320,36 @@ mod tests {
#[test]
fn test_get_record_batch_memory_size_shared_buffer() {
// Test with slices that share the same underlying buffer
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);

// `RecordBatch` with `original` array
// ----
let schema_origin = Arc::new(Schema::new(vec![Field::new(
"origin_col",
DataType::Int32,
false,
)]));
let batch_origin =
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();

// `RecordBatch` with all columns are reference to `original` array
// ----
let schema = Arc::new(Schema::new(vec![
Field::new("slice1", DataType::Int32, false),
Field::new("slice2", DataType::Int32, false),
]));

let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);

let batch =
let batch_sliced =
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
.unwrap();

let size = get_record_batch_memory_size(&batch);
// The size should only count the shared buffer once
assert_eq!(size, 20);
// Two sizes should all be only counting the buffer in `original` array
let size_origin = get_record_batch_memory_size(&batch_origin);
let size_sliced = get_record_batch_memory_size(&batch_sliced);

assert_eq!(size_origin, size_sliced);
}

#[test]
Expand Down

0 comments on commit 951a5f4

Please sign in to comment.