Skip to content

Commit

Permalink
feat: make FSL scalar also an arrayref (apache#8221)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 authored Nov 16, 2023
1 parent b126bca commit b013087
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 61 deletions.
77 changes: 26 additions & 51 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::hash_utils::create_hashes;
use crate::utils::array_into_list_array;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::kernels::numeric::*;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
use arrow::datatypes::{i256, Fields, SchemaBuilder};
use arrow::{
array::*,
compute::kernels::cast::{cast_with_options, CastOptions},
Expand Down Expand Up @@ -95,9 +95,13 @@ pub enum ScalarValue {
FixedSizeBinary(i32, Option<Vec<u8>>),
/// large binary
LargeBinary(Option<Vec<u8>>),
/// Fixed size list of nested ScalarValue
Fixedsizelist(Option<Vec<ScalarValue>>, FieldRef, i32),
/// Fixed size list scalar.
///
/// The array must be a FixedSizeListArray with length 1.
FixedSizeList(ArrayRef),
/// Represents a single element of a [`ListArray`] as an [`ArrayRef`]
///
/// The array must be a ListArray with length 1.
List(ArrayRef),
/// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date32(Option<i32>),
Expand Down Expand Up @@ -196,10 +200,8 @@ impl PartialEq for ScalarValue {
(FixedSizeBinary(_, _), _) => false,
(LargeBinary(v1), LargeBinary(v2)) => v1.eq(v2),
(LargeBinary(_), _) => false,
(Fixedsizelist(v1, t1, l1), Fixedsizelist(v2, t2, l2)) => {
v1.eq(v2) && t1.eq(t2) && l1.eq(l2)
}
(Fixedsizelist(_, _, _), _) => false,
(FixedSizeList(v1), FixedSizeList(v2)) => v1.eq(v2),
(FixedSizeList(_), _) => false,
(List(v1), List(v2)) => v1.eq(v2),
(List(_), _) => false,
(Date32(v1), Date32(v2)) => v1.eq(v2),
Expand Down Expand Up @@ -310,15 +312,7 @@ impl PartialOrd for ScalarValue {
(FixedSizeBinary(_, _), _) => None,
(LargeBinary(v1), LargeBinary(v2)) => v1.partial_cmp(v2),
(LargeBinary(_), _) => None,
(Fixedsizelist(v1, t1, l1), Fixedsizelist(v2, t2, l2)) => {
if t1.eq(t2) && l1.eq(l2) {
v1.partial_cmp(v2)
} else {
None
}
}
(Fixedsizelist(_, _, _), _) => None,
(List(arr1), List(arr2)) => {
(List(arr1), List(arr2)) | (FixedSizeList(arr1), FixedSizeList(arr2)) => {
if arr1.data_type() == arr2.data_type() {
let list_arr1 = as_list_array(arr1);
let list_arr2 = as_list_array(arr2);
Expand Down Expand Up @@ -349,6 +343,7 @@ impl PartialOrd for ScalarValue {
}
}
(List(_), _) => None,
(FixedSizeList(_), _) => None,
(Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
(Date32(_), _) => None,
(Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
Expand Down Expand Up @@ -465,12 +460,7 @@ impl std::hash::Hash for ScalarValue {
Binary(v) => v.hash(state),
FixedSizeBinary(_, v) => v.hash(state),
LargeBinary(v) => v.hash(state),
Fixedsizelist(v, t, l) => {
v.hash(state);
t.hash(state);
l.hash(state);
}
List(arr) => {
List(arr) | FixedSizeList(arr) => {
let arrays = vec![arr.to_owned()];
let hashes_buffer = &mut vec![0; arr.len()];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
Expand Down Expand Up @@ -881,11 +871,9 @@ impl ScalarValue {
ScalarValue::Binary(_) => DataType::Binary,
ScalarValue::FixedSizeBinary(sz, _) => DataType::FixedSizeBinary(*sz),
ScalarValue::LargeBinary(_) => DataType::LargeBinary,
ScalarValue::Fixedsizelist(_, field, length) => DataType::FixedSizeList(
Arc::new(Field::new("item", field.data_type().clone(), true)),
*length,
),
ScalarValue::List(arr) => arr.data_type().to_owned(),
ScalarValue::List(arr) | ScalarValue::FixedSizeList(arr) => {
arr.data_type().to_owned()
}
ScalarValue::Date32(_) => DataType::Date32,
ScalarValue::Date64(_) => DataType::Date64,
ScalarValue::Time32Second(_) => DataType::Time32(TimeUnit::Second),
Expand Down Expand Up @@ -1032,8 +1020,11 @@ impl ScalarValue {
ScalarValue::Binary(v) => v.is_none(),
ScalarValue::FixedSizeBinary(_, v) => v.is_none(),
ScalarValue::LargeBinary(v) => v.is_none(),
ScalarValue::Fixedsizelist(v, ..) => v.is_none(),
ScalarValue::List(arr) => arr.len() == arr.null_count(),
// arr.len() should be 1 for a list scalar, but we don't seem to
// enforce that anywhere, so we still check against array length.
ScalarValue::List(arr) | ScalarValue::FixedSizeList(arr) => {
arr.len() == arr.null_count()
}
ScalarValue::Date32(v) => v.is_none(),
ScalarValue::Date64(v) => v.is_none(),
ScalarValue::Time32Second(v) => v.is_none(),
Expand Down Expand Up @@ -1855,7 +1846,7 @@ impl ScalarValue {
.collect::<LargeBinaryArray>(),
),
},
ScalarValue::Fixedsizelist(..) => {
ScalarValue::FixedSizeList(..) => {
return _not_impl_err!("FixedSizeList is not supported yet")
}
ScalarValue::List(arr) => {
Expand Down Expand Up @@ -2407,7 +2398,7 @@ impl ScalarValue {
ScalarValue::LargeBinary(val) => {
eq_array_primitive!(array, index, LargeBinaryArray, val)?
}
ScalarValue::Fixedsizelist(..) => {
ScalarValue::FixedSizeList(..) => {
return _not_impl_err!("FixedSizeList is not supported yet")
}
ScalarValue::List(_) => return _not_impl_err!("List is not supported yet"),
Expand Down Expand Up @@ -2533,14 +2524,9 @@ impl ScalarValue {
| ScalarValue::LargeBinary(b) => {
b.as_ref().map(|b| b.capacity()).unwrap_or_default()
}
ScalarValue::Fixedsizelist(vals, field, _) => {
vals.as_ref()
.map(|vals| Self::size_of_vec(vals) - std::mem::size_of_val(vals))
.unwrap_or_default()
// `field` is boxed, so it is NOT already included in `self`
+ field.size()
ScalarValue::List(arr) | ScalarValue::FixedSizeList(arr) => {
arr.get_array_memory_size()
}
ScalarValue::List(arr) => arr.get_array_memory_size(),
ScalarValue::Struct(vals, fields) => {
vals.as_ref()
.map(|vals| {
Expand Down Expand Up @@ -2908,18 +2894,7 @@ impl fmt::Display for ScalarValue {
)?,
None => write!(f, "NULL")?,
},
ScalarValue::Fixedsizelist(e, ..) => match e {
Some(l) => write!(
f,
"{}",
l.iter()
.map(|v| format!("{v}"))
.collect::<Vec<_>>()
.join(",")
)?,
None => write!(f, "NULL")?,
},
ScalarValue::List(arr) => write!(
ScalarValue::List(arr) | ScalarValue::FixedSizeList(arr) => write!(
f,
"{}",
arrow::util::pretty::pretty_format_columns("col", &[arr.to_owned()])
Expand Down Expand Up @@ -2999,7 +2974,7 @@ impl fmt::Debug for ScalarValue {
}
ScalarValue::LargeBinary(None) => write!(f, "LargeBinary({self})"),
ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{self}\")"),
ScalarValue::Fixedsizelist(..) => write!(f, "FixedSizeList([{self}])"),
ScalarValue::FixedSizeList(arr) => write!(f, "FixedSizeList([{arr:?}])"),
ScalarValue::List(arr) => write!(f, "List([{arr:?}])"),
ScalarValue::Date32(_) => write!(f, "Date32(\"{self}\")"),
ScalarValue::Date64(_) => write!(f, "Date64(\"{self}\")"),
Expand Down
18 changes: 9 additions & 9 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ fn coerce_case_expression(case: Case, schema: &DFSchemaRef) -> Result<Case> {
mod test {
use std::sync::Arc;

use arrow::array::{FixedSizeListArray, Int32Array};
use arrow::datatypes::{DataType, TimeUnit};

use arrow::datatypes::Field;
Expand Down Expand Up @@ -1237,15 +1238,14 @@ mod test {

#[test]
fn test_casting_for_fixed_size_list() -> Result<()> {
let val = lit(ScalarValue::Fixedsizelist(
Some(vec![
ScalarValue::from(1i32),
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
Arc::new(Field::new("item", DataType::Int32, true)),
3,
));
let val = lit(ScalarValue::FixedSizeList(Arc::new(
FixedSizeListArray::new(
Arc::new(Field::new("item", DataType::Int32, true)),
3,
Arc::new(Int32Array::from(vec![1, 2, 3])),
None,
),
)));
let expr = Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::MakeArray,
args: vec![val.clone()],
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
Value::LargeUtf8Value(s.to_owned())
})
}
ScalarValue::Fixedsizelist(..) => Err(Error::General(
ScalarValue::FixedSizeList(..) => Err(Error::General(
"Proto serialization error: ScalarValue::Fixedsizelist not supported"
.to_string(),
)),
Expand Down

0 comments on commit b013087

Please sign in to comment.