Skip to content

Commit

Permalink
perf(rust, python): reduce page faults in q1 ~-30% (pola-rs#9423)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and c-peters committed Jul 14, 2023
1 parent 8a8611d commit 5de9711
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
32 changes: 26 additions & 6 deletions polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_core::frame::DataFrame;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::POOL;
use polars_utils::arena::Node;
use polars_utils::sync::SyncPtr;
use rayon::prelude::*;

use crate::executors::sources::DataFrameSource;
Expand Down Expand Up @@ -183,7 +184,8 @@ impl PipeLine {
ec: &PExecutionContext,
operator_start: usize,
operator_end: usize,
) -> PolarsResult<Option<SinkResult>> {
src: &mut Box<dyn Source>,
) -> PolarsResult<(Option<SinkResult>, SourceResult)> {
debug_assert!(chunks.len() <= sink.len());

fn run_operator_pipe(
Expand All @@ -205,6 +207,9 @@ impl PipeLine {
}
}
let sink_results = Arc::new(Mutex::new(None));
let mut next_batches: Option<PolarsResult<SourceResult>> = None;
let next_batches_ptr = &mut next_batches as *mut Option<PolarsResult<SourceResult>>;
let next_batches_ptr = unsafe { SyncPtr::new(next_batches_ptr) };

// 1. We will iterate the chunks/sinks/operators
// where every iteration belongs to a single thread
Expand All @@ -223,7 +228,6 @@ impl PipeLine {
let pipeline = &*self;
POOL.scope(|s| {
for ((chunk, sink), operator_pipe) in chunks
.clone()
.into_iter()
.zip(sink.iter_mut())
.zip(operators.iter_mut())
Expand All @@ -248,11 +252,23 @@ impl PipeLine {
}
})
}
// already get batches on the thread pool
// if one job is finished earlier we can already start that work
s.spawn(|_| {
let out = src.get_batches(ec);
unsafe {
let ptr = next_batches_ptr.get();
*ptr = Some(out);
}
})
});
self.operators = operators;

let next_batches = next_batches.unwrap()?;
let mut lock = sink_results.lock().unwrap();
lock.take().transpose()
lock.take()
.transpose()
.map(|sink_result| (sink_result, next_batches))
}

/// This thread local logic that pushed a data chunk into the operators + sink
Expand Down Expand Up @@ -333,16 +349,20 @@ impl PipeLine {
std::mem::take(&mut self.sinks).into_iter().enumerate()
{
for src in &mut std::mem::take(&mut self.sources) {
while let SourceResult::GotMoreData(chunks) = src.get_batches(ec)? {
let results = self.par_process_chunks(
let mut next_batches = src.get_batches(ec)?;

while let SourceResult::GotMoreData(chunks) = next_batches {
let (sink_result, next_batches2) = self.par_process_chunks(
chunks,
&mut sink,
ec,
operator_start,
operator_end,
src,
)?;
next_batches = next_batches2;

if let Some(SinkResult::Finished) = results {
if let Some(SinkResult::Finished) = sink_result {
sink_finished = true;
break;
}
Expand Down
17 changes: 5 additions & 12 deletions polars/polars-row/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{
use arrow::compute::cast::cast;
use arrow::datatypes::{DataType as ArrowDataType, DataType};
use arrow::types::NativeType;
use polars_utils::vec::ResizeFaster;

use crate::fixed::FixedLengthEncoding;
use crate::row::{RowsEncoded, SortField};
Expand Down Expand Up @@ -173,9 +174,8 @@ pub fn allocate_rows_buf(columns: &[ArrayRef], values: &mut Vec<u8>, offsets: &m
})
.sum();

offsets.clear();
offsets.reserve(num_rows + 1);
offsets.resize(num_rows, row_size_fixed);
offsets.fill_or_alloc(num_rows + 1, row_size_fixed);
unsafe { offsets.set_len(num_rows) };

// first write lengths to this buffer
let lengths = offsets;
Expand Down Expand Up @@ -223,23 +223,16 @@ pub fn allocate_rows_buf(columns: &[ArrayRef], values: &mut Vec<u8>, offsets: &m
// ensure we have len + 1 offsets
offsets.push(lagged_offset);

values.clear();
// todo! allocate uninit
values.resize(current_offset, 0u8);
values.fill_or_alloc(current_offset, 0u8);
} else {
let row_size: usize = columns
.iter()
.map(|arr| encoded_size(arr.data_type()))
.sum();
let n_bytes = num_rows * row_size;
// todo! allocate uninit
if values.capacity() == 0 {
// it is faster to allocate zeroed
// so if the capacity is 0, we alloc
*values = vec![0u8; n_bytes]
} else {
values.resize(n_bytes, 0u8);
}
values.fill_or_alloc(n_bytes, 0u8);

// note that offsets are shifted to the left
// assume 2 fields with a len of 1
Expand Down
1 change: 1 addition & 0 deletions polars/polars-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "private utils for the polars dataframe library"
[dependencies]
ahash.workspace = true
hashbrown.workspace = true
num-traits.workspace = true
once_cell.workspace = true
rayon.workspace = true
smartstring.workspace = true
Expand Down
31 changes: 31 additions & 0 deletions polars/polars-utils/src/vec.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::mem::MaybeUninit;

use num_traits::Zero;

pub trait IntoRawParts<T> {
fn into_raw_parts(self) -> (*mut T, usize, usize);

Expand All @@ -15,3 +19,30 @@ impl<T> IntoRawParts<T> for Vec<T> {
(self.as_ptr() as *mut T, self.len(), self.capacity())
}
}

/// Fill current allocation if if > 0
/// otherwise realloc
pub trait ResizeFaster<T: Copy> {
fn fill_or_alloc(&mut self, new_len: usize, value: T);
}

impl<T: Copy + Zero + PartialEq> ResizeFaster<T> for Vec<T> {
fn fill_or_alloc(&mut self, new_len: usize, value: T) {
if self.capacity() == 0 {
// it is faster to allocate zeroed
// so if the capacity is 0, we alloc (value might be 0)
*self = vec![value; new_len]
} else {
// first clear then reserve so that the reserve doesn't have
// to memcpy in case it needs to realloc.
self.clear();
self.reserve(new_len);

// // init the uninit values
let spare = &mut self.spare_capacity_mut()[..new_len];
let init_value = MaybeUninit::new(value);
spare.fill(init_value);
unsafe { self.set_len(new_len) }
}
}
}
1 change: 1 addition & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5de9711

Please sign in to comment.