From 980934d0ee7a6b8d43ebb251c6ea5df291bc2fec Mon Sep 17 00:00:00 2001 From: veeupup Date: Thu, 9 Nov 2023 22:04:56 +0800 Subject: [PATCH] rewrite `array_append/array_prepend` to remove deplicate codes Signed-off-by: veeupup --- .../avro_to_arrow/arrow_array_reader.rs | 20 +- .../physical-expr/src/array_expressions.rs | 185 +++++++----------- 2 files changed, 79 insertions(+), 126 deletions(-) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index fd91ea1cc538..855a8d0dbf40 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1536,12 +1536,10 @@ mod test { .unwrap() .resolve(&schema) .unwrap(); - let r4 = apache_avro::to_value(serde_json::json!({ - "col1": null - })) - .unwrap() - .resolve(&schema) - .unwrap(); + let r4 = apache_avro::to_value(serde_json::json!({ "col1": null })) + .unwrap() + .resolve(&schema) + .unwrap(); let mut w = apache_avro::Writer::new(&schema, vec![]); w.append(r1).unwrap(); @@ -1600,12 +1598,10 @@ mod test { }"#, ) .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ - "col1": null - })) - .unwrap() - .resolve(&schema) - .unwrap(); + let r1 = apache_avro::to_value(serde_json::json!({ "col1": null })) + .unwrap() + .resolve(&schema) + .unwrap(); let r2 = apache_avro::to_value(serde_json::json!({ "col1": { "col2": "hello" diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 64550aabf424..614ad62d36d9 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -24,6 +24,7 @@ use arrow::array::*; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::{DataType, Field, UInt64Type}; +use arrow::row::{RowConverter, SortField}; use arrow_buffer::NullBuffer; use datafusion_common::cast::{ @@ -577,58 +578,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result { ) } -macro_rules! append { - ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone(); - - let element = downcast_arg!($ELEMENT, $ARRAY_TYPE); - for (arr, el) in $ARRAY.iter().zip(element.iter()) { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, $ARRAY_TYPE); - values = downcast_arg!( - compute::concat(&[ - &values, - child_array, - &$ARRAY_TYPE::from(vec![el]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + child_array.len() as i32 + 1i32); - } - None => { - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el.clone()]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + 1i32); - } - } - } - - let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; -} - /// Array_append SQL function pub fn array_append(args: &[ArrayRef]) -> Result { let arr = as_list_array(&args[0])?; @@ -638,71 +587,50 @@ pub fn array_append(args: &[ArrayRef]) -> Result { let res = match arr.value_type() { DataType::List(_) => concat_internal(args)?, DataType::Null => return make_array(&[element.to_owned()]), - data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - append!(arr, element, $ARRAY_TYPE) + dt => { + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let r_rows = converter.convert_columns(&[element.clone()])?; + let mut offsets = vec![0]; + let mut new_arrays = vec![]; + for (i, arr) in arr.iter().enumerate() { + let r_row = r_rows.row(i); + let rows = if let Some(arr) = arr { + let mut l_rows = converter.convert_columns(&[arr])?; + l_rows.push(r_row); + l_rows + } else { + let mut rows = converter.empty_rows(1, 1); + rows.push(r_row); + rows }; + let last_offset: i32 = match offsets.last().copied() { + Some(offset) => offset, + None => return internal_err!("offsets should not be empty"), + }; + offsets.push(last_offset + rows.num_rows() as i32); + let arrays = converter.convert_rows(rows.iter())?; + let array = match arrays.get(0) { + Some(array) => array.clone(), + None => { + return internal_err!( + "array_append: failed to get value from rows" + ) + } + }; + new_arrays.push(array); } - call_array_function!(data_type, false) + let field = Arc::new(Field::new("item", dt, true)); + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = + new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + Arc::new(ListArray::try_new(field, offsets, values, None)?) } }; Ok(res) } -macro_rules! prepend { - ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone(); - - let element = downcast_arg!($ELEMENT, $ARRAY_TYPE); - for (arr, el) in $ARRAY.iter().zip(element.iter()) { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, $ARRAY_TYPE); - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el]), - child_array - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + child_array.len() as i32 + 1i32); - } - None => { - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el.clone()]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + 1i32); - } - } - } - - let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; -} - /// Array_prepend SQL function pub fn array_prepend(args: &[ArrayRef]) -> Result { let element = &args[0]; @@ -712,13 +640,42 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result { let res = match arr.value_type() { DataType::List(_) => concat_internal(args)?, DataType::Null => return make_array(&[element.to_owned()]), - data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - prepend!(arr, element, $ARRAY_TYPE) + dt => { + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let r_rows = converter.convert_columns(&[element.clone()])?; + let mut offsets = vec![0]; + let mut new_arrays = vec![]; + for (i, arr) in arr.iter().enumerate() { + let mut rows = converter.empty_rows(1, 1); + rows.push(r_rows.row(i)); + if let Some(arr) = arr { + let l_rows = converter.convert_columns(&[arr])?; + for row in l_rows.iter() { + rows.push(row); + } + } + let last_offset: i32 = match offsets.last().copied() { + Some(offset) => offset, + None => return internal_err!("offsets should not be empty"), }; + offsets.push(last_offset + rows.num_rows() as i32); + let arrays = converter.convert_rows(rows.iter())?; + let array = match arrays.get(0) { + Some(array) => array.clone(), + None => { + return internal_err!( + "array_append: failed to get value from rows" + ) + } + }; + new_arrays.push(array); } - call_array_function!(data_type, false) + let field = Arc::new(Field::new("item", dt, true)); + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = + new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + Arc::new(ListArray::try_new(field, offsets, values, None)?) } };