Skip to content

Commit

Permalink
impl vectorized_append.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 23, 2024
1 parent 1a7c2eb commit 3415659
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 14 deletions.
47 changes: 43 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,23 @@ impl GroupValuesColumn {
next_group_index = self.group_index_lists[current_group_index];
}
}

self.vectorized_equal_to_results
.resize(self.vectorized_equal_to_group_indices.len(), true);
}

/// Perform `vectorized_equal_to`
///
///
fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) {
debug_assert_eq!(
self.vectorized_equal_to_group_indices.len(),
self.vectorized_equal_to_row_indices.len()
);

if self.vectorized_equal_to_group_indices.is_empty() {
return;
}

// Vectorized equal to `cols` and `group columns`
let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results);
equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true);
for (col_idx, group_col) in self.group_values.iter().enumerate() {
group_col.vectorized_equal_to(
&self.vectorized_equal_to_group_indices,
Expand All @@ -337,8 +344,40 @@ impl GroupValuesColumn {
&mut equal_to_results,
);
}

let mut current_row_equal_to_result = false;
let mut current_row = *self.vectorized_equal_to_row_indices.first().unwrap();
for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() {
// If found next row, according to the equal to result of `current_row`
if current_row != row {
if !current_row_equal_to_result {
self.vectorized_append_row_indices.push(row);
}
current_row = row;
current_row_equal_to_result = equal_to_results[idx];
continue;
}
current_row_equal_to_result |= equal_to_results[idx];
}

if !current_row_equal_to_result {
self.vectorized_append_row_indices.push(current_row);
}

self.vectorized_equal_to_results = equal_to_results;
}

/// Perform `vectorized_append`
///
/// 1. Vectorized append new values into `group_values`
/// 2. Update `map` and `group_index_lists`
fn vectorized_append(&mut self, cols: &[ArrayRef], batch_hashes: &[u64]) {
if self.vectorized_append_row_indices.is_empty() {
return;
}

// 1. Vectorized append new values into `group_values`
}
}

/// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v
Expand Down
102 changes: 92 additions & 10 deletions datafusion/physical-plan/src/aggregates/group_values/group_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
all_non_null: bool,
) {
let arr = array.as_primitive::<T>();

match (NULLABLE, all_non_null) {
(true, true) => {
self.nulls.append_n(rows.len(), false);
Expand Down Expand Up @@ -280,7 +281,7 @@ where
}
}

fn append_batch_inner<B>(
fn vectorized_append_inner<B>(
&mut self,
array: &ArrayRef,
rows: &[usize],
Expand All @@ -293,7 +294,7 @@ where
if all_non_null {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.append_value(arr, row);
self.do_append_val_inner(arr, row);
}
} else {
for &row in rows {
Expand All @@ -304,7 +305,7 @@ where
self.offsets.push(O::usize_as(offset));
} else {
self.nulls.append(false);
self.append_value(arr, row);
self.do_append_val_inner(arr, row);
}
}
}
Expand All @@ -322,11 +323,11 @@ where
self.offsets.push(O::usize_as(offset));
} else {
self.nulls.append(false);
self.append_value(arr, row);
self.do_append_val_inner(arr, row);
}
}

fn append_value<B>(&mut self, array: &GenericByteArray<B>, row: usize)
fn do_append_val_inner<B>(&mut self, array: &GenericByteArray<B>, row: usize)
where
B: ByteArrayType,
{
Expand All @@ -340,6 +341,40 @@ where
B: ByteArrayType,
{
let array = array.as_bytes::<B>();
self.do_equal_to_inner(lhs_row, array, rhs_row)
}

fn vectorized_equal_to_inner<B>(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
equal_to_results: &mut [bool],
) where
B: ByteArrayType,
{
let array = array.as_bytes::<B>();

for (idx, &lhs_row) in group_indices.iter().enumerate() {
// Has found not equal to, don't need to check
if !equal_to_results[idx] {
continue;
}

let rhs_row = rows[idx];
equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row);
}
}

fn do_equal_to_inner<B>(
&self,
lhs_row: usize,
array: &GenericByteArray<B>,
rhs_row: usize,
) -> bool
where
B: ByteArrayType,
{
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
if let Some(result) = nulls_equal_to(exist_null, input_null) {
Expand Down Expand Up @@ -411,7 +446,34 @@ where
rows: &[usize],
equal_to_results: &mut [bool],
) {
todo!()
// Sanity array type
match self.output_type {
OutputType::Binary => {
debug_assert!(matches!(
array.data_type(),
DataType::Binary | DataType::LargeBinary
));
self.vectorized_equal_to_inner::<GenericBinaryType<O>>(
group_indices,
array,
rows,
equal_to_results,
);
}
OutputType::Utf8 => {
debug_assert!(matches!(
array.data_type(),
DataType::Utf8 | DataType::LargeUtf8
));
self.vectorized_equal_to_inner::<GenericStringType<O>>(
group_indices,
array,
rows,
equal_to_results,
);
}
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
}
}

fn vectorized_append(
Expand All @@ -426,7 +488,7 @@ where
column.data_type(),
DataType::Binary | DataType::LargeBinary
));
self.append_batch_inner::<GenericBinaryType<O>>(
self.vectorized_append_inner::<GenericBinaryType<O>>(
column,
rows,
all_non_null,
Expand All @@ -437,7 +499,7 @@ where
column.data_type(),
DataType::Utf8 | DataType::LargeUtf8
));
self.append_batch_inner::<GenericStringType<O>>(
self.vectorized_append_inner::<GenericStringType<O>>(
column,
rows,
all_non_null,
Expand Down Expand Up @@ -606,7 +668,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
self
}

fn append_batch_inner(
fn vectorized_append_inner(
&mut self,
array: &ArrayRef,
rows: &[usize],
Expand Down Expand Up @@ -693,6 +755,26 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
self.do_equal_to_inner(lhs_row, array, rhs_row)
}

fn vectorized_equal_to_inner(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
equal_to_results: &mut [bool],
) {
let array = array.as_byte_view::<B>();

for (idx, &lhs_row) in group_indices.iter().enumerate() {
// Has found not equal to, don't need to check
if !equal_to_results[idx] {
continue;
}

let rhs_row = rows[idx];
equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row);
}
}

fn do_equal_to_inner(
&self,
lhs_row: usize,
Expand Down Expand Up @@ -992,7 +1074,7 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
rows: &[usize],
all_non_null: bool,
) {
self.append_batch_inner(array, rows, all_non_null);
self.vectorized_append_inner(array, rows, all_non_null);
}

fn len(&self) -> usize {
Expand Down

0 comments on commit 3415659

Please sign in to comment.