Skip to content

Commit

Permalink
rewrite array_append/array_prepend to remove deplicate codes
Browse files Browse the repository at this point in the history
Signed-off-by: veeupup <[email protected]>
  • Loading branch information
Veeupup committed Nov 9, 2023
1 parent 4512805 commit 980934d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 126 deletions.
20 changes: 8 additions & 12 deletions datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,12 +1536,10 @@ mod test {
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();

let mut w = apache_avro::Writer::new(&schema, vec![]);
w.append(r1).unwrap();
Expand Down Expand Up @@ -1600,12 +1598,10 @@ mod test {
}"#,
)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();
let r2 = apache_avro::to_value(serde_json::json!({
"col1": {
"col2": "hello"
Expand Down
185 changes: 71 additions & 114 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::array::*;
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::datatypes::{DataType, Field, UInt64Type};
use arrow::row::{RowConverter, SortField};
use arrow_buffer::NullBuffer;

use datafusion_common::cast::{
Expand Down Expand Up @@ -577,58 +578,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}

macro_rules! append {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
child_array,
&$ARRAY_TYPE::from(vec![el])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
}

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
}

/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = as_list_array(&args[0])?;
Expand All @@ -638,71 +587,50 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let res = match arr.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
append!(arr, element, $ARRAY_TYPE)
dt => {
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
let r_rows = converter.convert_columns(&[element.clone()])?;
let mut offsets = vec![0];
let mut new_arrays = vec![];
for (i, arr) in arr.iter().enumerate() {
let r_row = r_rows.row(i);
let rows = if let Some(arr) = arr {
let mut l_rows = converter.convert_columns(&[arr])?;
l_rows.push(r_row);
l_rows
} else {
let mut rows = converter.empty_rows(1, 1);
rows.push(r_row);
rows
};
let last_offset: i32 = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + rows.num_rows() as i32);
let arrays = converter.convert_rows(rows.iter())?;
let array = match arrays.get(0) {
Some(array) => array.clone(),
None => {
return internal_err!(
"array_append: failed to get value from rows"
)
}
};
new_arrays.push(array);
}
call_array_function!(data_type, false)
let field = Arc::new(Field::new("item", dt, true));
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref =
new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
Arc::new(ListArray::try_new(field, offsets, values, None)?)
}
};

Ok(res)
}

macro_rules! prepend {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el]),
child_array
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
}

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
}

/// Array_prepend SQL function
pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
let element = &args[0];
Expand All @@ -712,13 +640,42 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
let res = match arr.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
prepend!(arr, element, $ARRAY_TYPE)
dt => {
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
let r_rows = converter.convert_columns(&[element.clone()])?;
let mut offsets = vec![0];
let mut new_arrays = vec![];
for (i, arr) in arr.iter().enumerate() {
let mut rows = converter.empty_rows(1, 1);
rows.push(r_rows.row(i));
if let Some(arr) = arr {
let l_rows = converter.convert_columns(&[arr])?;
for row in l_rows.iter() {
rows.push(row);
}
}
let last_offset: i32 = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + rows.num_rows() as i32);
let arrays = converter.convert_rows(rows.iter())?;
let array = match arrays.get(0) {
Some(array) => array.clone(),
None => {
return internal_err!(
"array_append: failed to get value from rows"
)
}
};
new_arrays.push(array);
}
call_array_function!(data_type, false)
let field = Arc::new(Field::new("item", dt, true));
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref =
new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
Arc::new(ListArray::try_new(field, offsets, values, None)?)
}
};

Expand Down

0 comments on commit 980934d

Please sign in to comment.