Skip to content

Commit

Permalink
Minor: move batch spilling methods to lib.rs to make it reusable (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead authored and findepi committed Jul 16, 2024
1 parent 4b21fbc commit 324d2ac
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 87 deletions.
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use crate::aggregates::{
};
use crate::common::IPCWriter;
use crate::metrics::{BaselineMetrics, RecordOutput};
use crate::sorts::sort::{read_spill_as_stream, sort_batch};
use crate::sorts::sort::sort_batch;
use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
use crate::{aggregates, ExecutionPlan, PhysicalExpr};
use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr};
use crate::{RecordBatchStream, SendableRecordBatchStream};

use arrow::array::*;
Expand Down Expand Up @@ -752,7 +752,7 @@ impl GroupedHashAggregateStream {
})),
)));
for spill in self.spill_state.spills.drain(..) {
let stream = read_spill_as_stream(spill, schema.clone())?;
let stream = read_spill_as_stream(spill, schema.clone(), 2)?;
streams.push(stream);
}
self.spill_state.is_stream_merging = true;
Expand Down
64 changes: 63 additions & 1 deletion datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
use std::any::Any;
use std::fmt::Debug;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -28,15 +31,18 @@ use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;

use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_common::{exec_datafusion_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};

use futures::stream::TryStreamExt;
use log::debug;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;

mod ordering;
Expand Down Expand Up @@ -87,8 +93,13 @@ pub use datafusion_physical_expr::{
};

// Backwards compatibility
use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};

pub mod udaf {
pub use datafusion_physical_expr_common::aggregate::{
create_aggregate_expr, AggregateFunctionExpr,
Expand Down Expand Up @@ -799,6 +810,57 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
actual.iter().map(|elem| elem.to_string()).collect()
}

/// Read spilled batches from the disk
///
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, path.path()));

Ok(builder.build())
}

/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
95 changes: 12 additions & 83 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,38 @@
use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::common::{spawn_buffered, IPCWriter};
use crate::common::spawn_buffered;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::streaming_merge::streaming_merge;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode,
ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType,
Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
Statistics,
};

use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;

use futures::{StreamExt, TryStreamExt};
use log::{debug, error, trace};
use tokio::sync::mpsc::Sender;
use log::{debug, trace};

struct ExternalSorterMetrics {
/// metrics
Expand Down Expand Up @@ -345,7 +338,7 @@ impl ExternalSorter {
spill.path()
)));
}
let stream = read_spill_as_stream(spill, self.schema.clone())?;
let stream = read_spill_as_stream(spill, self.schema.clone(), 2)?;
streams.push(stream);
}

Expand Down Expand Up @@ -402,7 +395,7 @@ impl ExternalSorter {
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let spilled_rows =
spill_sorted_batches(batches, spill_file.path(), self.schema.clone()).await?;
spill_record_batches(batches, spill_file.path().into(), self.schema.clone())?;
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(used);
Expand Down Expand Up @@ -667,70 +660,6 @@ pub(crate) fn lexsort_to_indices_multi_columns(
Ok(indices)
}

/// Spills sorted `in_memory_batches` to disk.
///
/// Returns number of the rows spilled to disk.
async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
path: &Path,
schema: SchemaRef,
) -> Result<usize> {
let path: PathBuf = path.into();
let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema));
match task.join().await {
Ok(r) => r,
Err(e) => exec_err!("Error occurred while spilling {e}"),
}
}

pub(crate) fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
let sender = builder.tx();

builder.spawn_blocking(move || {
let result = read_spill(sender, path.path());
if let Err(e) = &result {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
result
});

Ok(builder.build())
}

fn write_sorted(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| DataFusionError::Execution(format!("{e}")))?;
}
Ok(())
}

/// Sort execution plan.
///
/// Support sorting datasets that are larger than the memory allotted
Expand Down Expand Up @@ -776,7 +705,7 @@ impl SortExec {
/// Specify the partitioning behavior of this sort exec
///
/// If `preserve_partitioning` is true, sorts each partition
/// individually, producing one sorted strema for each input partition.
/// individually, producing one sorted stream for each input partition.
///
/// If `preserve_partitioning` is false, sorts and merges all
/// input partitions producing a single, sorted partition.
Expand Down

0 comments on commit 324d2ac

Please sign in to comment.