diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs index 73bd8cdd1ce9..455238c2cdd2 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs @@ -9,6 +9,7 @@ use polars_core::frame::DataFrame; use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_core::POOL; use polars_utils::arena::Node; +use polars_utils::sync::SyncPtr; use rayon::prelude::*; use crate::executors::sources::DataFrameSource; @@ -183,7 +184,8 @@ impl PipeLine { ec: &PExecutionContext, operator_start: usize, operator_end: usize, - ) -> PolarsResult> { + src: &mut Box, + ) -> PolarsResult<(Option, SourceResult)> { debug_assert!(chunks.len() <= sink.len()); fn run_operator_pipe( @@ -205,6 +207,9 @@ impl PipeLine { } } let sink_results = Arc::new(Mutex::new(None)); + let mut next_batches: Option> = None; + let next_batches_ptr = &mut next_batches as *mut Option>; + let next_batches_ptr = unsafe { SyncPtr::new(next_batches_ptr) }; // 1. We will iterate the chunks/sinks/operators // where every iteration belongs to a single thread @@ -223,7 +228,6 @@ impl PipeLine { let pipeline = &*self; POOL.scope(|s| { for ((chunk, sink), operator_pipe) in chunks - .clone() .into_iter() .zip(sink.iter_mut()) .zip(operators.iter_mut()) @@ -248,11 +252,23 @@ impl PipeLine { } }) } + // already get batches on the thread pool + // if one job is finished earlier we can already start that work + s.spawn(|_| { + let out = src.get_batches(ec); + unsafe { + let ptr = next_batches_ptr.get(); + *ptr = Some(out); + } + }) }); self.operators = operators; + let next_batches = next_batches.unwrap()?; let mut lock = sink_results.lock().unwrap(); - lock.take().transpose() + lock.take() + .transpose() + .map(|sink_result| (sink_result, next_batches)) } /// This thread local logic that pushed a data chunk into the operators + sink @@ -333,16 +349,20 @@ impl PipeLine { std::mem::take(&mut self.sinks).into_iter().enumerate() { for src in &mut std::mem::take(&mut self.sources) { - while let SourceResult::GotMoreData(chunks) = src.get_batches(ec)? { - let results = self.par_process_chunks( + let mut next_batches = src.get_batches(ec)?; + + while let SourceResult::GotMoreData(chunks) = next_batches { + let (sink_result, next_batches2) = self.par_process_chunks( chunks, &mut sink, ec, operator_start, operator_end, + src, )?; + next_batches = next_batches2; - if let Some(SinkResult::Finished) = results { + if let Some(SinkResult::Finished) = sink_result { sink_finished = true; break; } diff --git a/polars/polars-row/src/encode.rs b/polars/polars-row/src/encode.rs index 0b79821e9929..653123bd016f 100644 --- a/polars/polars-row/src/encode.rs +++ b/polars/polars-row/src/encode.rs @@ -4,6 +4,7 @@ use arrow::array::{ use arrow::compute::cast::cast; use arrow::datatypes::{DataType as ArrowDataType, DataType}; use arrow::types::NativeType; +use polars_utils::vec::ResizeFaster; use crate::fixed::FixedLengthEncoding; use crate::row::{RowsEncoded, SortField}; @@ -173,9 +174,8 @@ pub fn allocate_rows_buf(columns: &[ArrayRef], values: &mut Vec, offsets: &m }) .sum(); - offsets.clear(); - offsets.reserve(num_rows + 1); - offsets.resize(num_rows, row_size_fixed); + offsets.fill_or_alloc(num_rows + 1, row_size_fixed); + unsafe { offsets.set_len(num_rows) }; // first write lengths to this buffer let lengths = offsets; @@ -223,9 +223,8 @@ pub fn allocate_rows_buf(columns: &[ArrayRef], values: &mut Vec, offsets: &m // ensure we have len + 1 offsets offsets.push(lagged_offset); - values.clear(); // todo! allocate uninit - values.resize(current_offset, 0u8); + values.fill_or_alloc(current_offset, 0u8); } else { let row_size: usize = columns .iter() @@ -233,13 +232,7 @@ pub fn allocate_rows_buf(columns: &[ArrayRef], values: &mut Vec, offsets: &m .sum(); let n_bytes = num_rows * row_size; // todo! allocate uninit - if values.capacity() == 0 { - // it is faster to allocate zeroed - // so if the capacity is 0, we alloc - *values = vec![0u8; n_bytes] - } else { - values.resize(n_bytes, 0u8); - } + values.fill_or_alloc(n_bytes, 0u8); // note that offsets are shifted to the left // assume 2 fields with a len of 1 diff --git a/polars/polars-utils/Cargo.toml b/polars/polars-utils/Cargo.toml index e5ec7ed44fae..e73ba57fcab7 100644 --- a/polars/polars-utils/Cargo.toml +++ b/polars/polars-utils/Cargo.toml @@ -11,6 +11,7 @@ description = "private utils for the polars dataframe library" [dependencies] ahash.workspace = true hashbrown.workspace = true +num-traits.workspace = true once_cell.workspace = true rayon.workspace = true smartstring.workspace = true diff --git a/polars/polars-utils/src/vec.rs b/polars/polars-utils/src/vec.rs index b054f1bfa357..7446e3218247 100644 --- a/polars/polars-utils/src/vec.rs +++ b/polars/polars-utils/src/vec.rs @@ -1,3 +1,7 @@ +use std::mem::MaybeUninit; + +use num_traits::Zero; + pub trait IntoRawParts { fn into_raw_parts(self) -> (*mut T, usize, usize); @@ -15,3 +19,30 @@ impl IntoRawParts for Vec { (self.as_ptr() as *mut T, self.len(), self.capacity()) } } + +/// Fill current allocation if if > 0 +/// otherwise realloc +pub trait ResizeFaster { + fn fill_or_alloc(&mut self, new_len: usize, value: T); +} + +impl ResizeFaster for Vec { + fn fill_or_alloc(&mut self, new_len: usize, value: T) { + if self.capacity() == 0 { + // it is faster to allocate zeroed + // so if the capacity is 0, we alloc (value might be 0) + *self = vec![value; new_len] + } else { + // first clear then reserve so that the reserve doesn't have + // to memcpy in case it needs to realloc. + self.clear(); + self.reserve(new_len); + + // // init the uninit values + let spare = &mut self.spare_capacity_mut()[..new_len]; + let init_value = MaybeUninit::new(value); + spare.fill(init_value); + unsafe { self.set_len(new_len) } + } + } +} diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 0db7ea5e4da5..46634db5f25a 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1659,6 +1659,7 @@ version = "0.30.0" dependencies = [ "ahash", "hashbrown 0.13.2", + "num-traits", "once_cell", "rayon", "smartstring",