Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve shuffle performance #559

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ pub trait DoubleConf {

pub trait StringConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<&'static str> {
fn value(&self) -> Result<String> {
let key = jni_new_string!(self.key())?;
let value = jni_get_string!(
jni_call_static!(BlazeConf.stringConf(key.as_obj()) -> JObject)?
.as_obj()
.into()
)?;
Ok(Box::leak(value.into_boxed_str()))
Ok(value)
}
}
46 changes: 26 additions & 20 deletions native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,8 @@ impl CoalesceStream {
fn coalesce(&mut self) -> Result<RecordBatch> {
// better concat_batches() implementation that releases old batch columns asap.
let schema = self.input.schema();

// collect all columns
let mut all_cols = schema.fields().iter().map(|_| vec![]).collect::<Vec<_>>();
for batch in std::mem::take(&mut self.staging_batches) {
for i in 0..all_cols.len() {
all_cols[i].push(batch.column(i).clone());
}
}

// coalesce each column
let mut coalesced_cols = vec![];
for (cols, field) in all_cols.into_iter().zip(schema.fields()) {
let dt = field.data_type();
coalesced_cols.push(coalesce_arrays_unchecked(dt, &cols));
}
let coalesced_batch = RecordBatch::try_new_with_options(
schema,
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(self.staging_rows)),
)?;
let coalesced_batch = coalesce_batches_unchecked(schema, &self.staging_batches);
self.staging_batches.clear();
self.staging_rows = 0;
self.staging_batches_mem_size = 0;
Ok(coalesced_batch)
Expand Down Expand Up @@ -177,6 +159,30 @@ impl Stream for CoalesceStream {
}
}

/// coalesce batches without checking there schemas, invokers must make
/// sure all arrays have the same schema
pub fn coalesce_batches_unchecked(schema: SchemaRef, batches: &[RecordBatch]) -> RecordBatch {
let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
let num_fields = schema.fields().len();
let mut coalesced_cols = vec![];

for i in 0..num_fields {
let data_type = schema.field(i).data_type();
let mut cols = Vec::with_capacity(batches.len());
for j in 0..batches.len() {
cols.push(batches[j].column(i).clone());
}
coalesced_cols.push(coalesce_arrays_unchecked(data_type, &cols));
}

RecordBatch::try_new_with_options(
schema,
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.expect("error coalescing record batch")
}

/// coalesce arrays without checking there data types, invokers must make
/// sure all arrays have the same data type
pub fn coalesce_arrays_unchecked(data_type: &DataType, arrays: &[ArrayRef]) -> ArrayRef {
Expand Down
Loading
Loading