Skip to content

Commit

Permalink
remove dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz committed Jan 9, 2025
1 parent 09e59e4 commit 28a01c1
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 7 deletions.
26 changes: 26 additions & 0 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ impl ArrayBuilder for ListArrayBuilder {
}
}

fn append_iter(&mut self, values: impl IntoIterator<Item = Option<ListValue>>) {
for value in values {
match value {
None => {
self.bitmap.append(false);
let last = *self.offsets.last().unwrap();
self.offsets.push(last);
}
Some(v) => {
self.bitmap.append(true);
let last = *self.offsets.last().unwrap();
let elems = v.iter_value();
self.offsets.push(
last.checked_add(elems.len() as u32)
.expect("offset overflow"),
);
self.value.append_iter(elems);
}
}
}
}

fn append_n(&mut self, n: usize, value: Option<ListRef<'_>>) {
match value {
None => {
Expand Down Expand Up @@ -398,6 +420,10 @@ impl ListValue {
self.values.iter()
}

pub fn iter_value(&self) -> Vec<Datum> {
self.values.iter_datum()
}

/// Get the element at the given index. Returns `None` if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<DatumRef<'_>> {
if index < self.len() {
Expand Down
47 changes: 47 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub use data_chunk_iter::RowRef;
pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
pub use interval_array::{IntervalArray, IntervalArrayBuilder};
pub use iterator::ArrayIterator;
use itertools::Itertools;
pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
Expand Down Expand Up @@ -123,6 +124,14 @@ pub trait ArrayBuilder: Send + Sync + Sized + 'static {
self.append_n(1, value);
}

/// Append a iterator value to the builder
fn append_iter(
&mut self,
_values: impl IntoIterator<Item = Option<<Self::ArrayType as Array>::OwnedItem>>,
) {
unimplemented!()
}

/// Append an owned value to builder.
fn append_owned(&mut self, value: Option<<Self::ArrayType as Array>::OwnedItem>) {
let value = value.as_ref().map(|s| s.as_scalar_ref());
Expand Down Expand Up @@ -488,6 +497,40 @@ impl ArrayBuilderImpl {
dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
}

pub fn append_iter(&mut self, data: Vec<Datum>) {
dispatch_array_builder_variants!(self, inner, [I = VARIANT_NAME], {
let mapped_data = data.into_iter().map(|datum| match datum {
None => None,
Some(scalar) => {
let scalar_type = scalar.get_ident();
Some(scalar.try_into().unwrap_or_else(|_| {
panic!(
"type mismatch, array builder type: {}, scalar type: {}",
I, scalar_type
)
}))
}
});
inner.append_iter(mapped_data);
})
// match self {
// ArrayBuilderImpl::List(list_builder) => {
// let mapped_data = data.into_iter().map(|datum| match datum {
// None => None,
// Some(scalar) => match scalar {
// ScalarImpl::List(list_value) => {
// let mapped_datum = Some(list_value);
// mapped_datum
// }
// _ => unreachable!(),
// },
// });
// list_builder.append_iter(mapped_data);
// }
// _ => unimplemented!(),
// }
}

/// Append a [`Datum`] or [`DatumRef`] multiple times,
/// panicking if the datum's type does not match the array builder's type.
pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
Expand Down Expand Up @@ -635,6 +678,10 @@ impl ArrayImpl {
pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
(0..self.len()).map(|i| self.value_at(i))
}

pub fn iter_datum(&self) -> Vec<Datum> {
(0..self.len()).map(|i| self.datum_at(i)).collect_vec()
}
}

pub type ArrayRef = Arc<ArrayImpl>;
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
Self::new(capacity)
}

#[inline]
fn append_n(&mut self, n: usize, value: Option<T>) {
match value {
Some(x) => {
Expand All @@ -301,6 +302,12 @@ impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
}
}

fn append_iter(&mut self, values: impl IntoIterator<Item = Option<T>>) {
for value in values {
self.append(value);
}
}

fn append_array(&mut self, other: &PrimitiveArray<T>) {
for bit in other.bitmap.iter() {
self.bitmap.append(bit);
Expand Down
29 changes: 22 additions & 7 deletions src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::array::stream_record::Record;
use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
use crate::bitmap::BitmapBuilder;
use crate::row::Row;
use crate::types::{DataType, DatumRef};
use crate::types::{DataType, Datum, DatumRef, ToOwnedDatum};
use crate::util::iter_util::ZipEqFast;

/// Build stream chunks with fixed chunk size from rows or records.
Expand All @@ -43,6 +43,8 @@ pub struct StreamChunkBuilder {

/// Number of currently pending rows.
size: usize,

buffer: Vec<Vec<Datum>>,
}

impl Drop for StreamChunkBuilder {
Expand Down Expand Up @@ -70,11 +72,13 @@ impl StreamChunkBuilder {
let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);

let ops = Vec::with_capacity(initial_capacity);
let column_builders = data_types
let column_builders: Vec<_> = data_types
.iter()
.map(|datatype| datatype.create_array_builder(initial_capacity))
.collect();
let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
let buffer = vec![Vec::<Datum>::new(); column_builders.len()];

Self {
ops,
column_builders,
Expand All @@ -83,24 +87,28 @@ impl StreamChunkBuilder {
max_chunk_size: Some(max_chunk_size),
initial_capacity,
size: 0,
buffer,
}
}

/// Create a new `StreamChunkBuilder` with unlimited chunk size.
/// The builder will only yield chunks when `take` is called.
pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
let column_builders: Vec<_> = data_types
.iter()
.map(|datatype| datatype.create_array_builder(initial_capacity))
.collect();
let buffer = vec![Vec::<Datum>::new(); column_builders.len()];
Self {
ops: Vec::with_capacity(initial_capacity),
column_builders: data_types
.iter()
.map(|datatype| datatype.create_array_builder(initial_capacity))
.collect(),
column_builders,
data_types,
vis_builder: BitmapBuilder::default(),
max_chunk_size: None,
initial_capacity,
size: 0,
buffer,
}
}

Expand Down Expand Up @@ -164,6 +172,13 @@ impl StreamChunkBuilder {
self.size = 0;

let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
for (i, datum) in self.buffer.iter_mut().enumerate() {
self.column_builders[i].append_iter(std::mem::replace(
datum,
Vec::with_capacity(self.initial_capacity),
));
}

let columns = self
.column_builders
.iter_mut()
Expand All @@ -190,7 +205,7 @@ impl StreamChunkBuilder {
) -> Option<StreamChunk> {
self.ops.push(op);
for (i, datum) in iter {
self.column_builders[i].append(datum);
self.buffer[i].push(datum.to_owned_datum());
}
self.vis_builder.append(VIS);
self.size += 1;
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/utf8_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ impl ArrayBuilder for Utf8ArrayBuilder {
Self::new(item_capacity)
}

fn append_iter(&mut self, values: impl IntoIterator<Item = Option<Box<str>>>) {
for value in values {
self.bytes.append(value.as_deref().map(|s| s.as_bytes()))
}
}

#[inline]
fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a str>) {
self.bytes.append_n(n, value.map(|v| v.as_bytes()));
Expand Down

0 comments on commit 28a01c1

Please sign in to comment.