-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate ScalarUDF output rows and fix nulls for array_has
and get_field
for Map
#10148
Changes from 18 commits
c94cac8
5d67c73
96f4fe2
5587b03
bba9bfe
9be1a5a
2a93b8c
9743382
7eebcf2
c972d7d
e5bbfaf
ed41d3a
6603135
cc53bd3
cf7fac3
fc304ae
e70245e
cda3e3b
da77fb2
efb1c5f
6c397e8
c1458c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -288,36 +288,40 @@ fn general_array_has_dispatch<O: OffsetSizeTrait>( | |
} else { | ||
array | ||
}; | ||
|
||
for (row_idx, (arr, sub_arr)) in array.iter().zip(sub_array.iter()).enumerate() { | ||
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) { | ||
let arr_values = converter.convert_columns(&[arr])?; | ||
let sub_arr_values = if comparison_type != ComparisonType::Single { | ||
converter.convert_columns(&[sub_arr])? | ||
} else { | ||
converter.convert_columns(&[element.clone()])? | ||
}; | ||
|
||
let mut res = match comparison_type { | ||
ComparisonType::All => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.all(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Any => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.any(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Single => arr_values | ||
.iter() | ||
.dedup() | ||
.any(|x| x == sub_arr_values.row(row_idx)), | ||
}; | ||
|
||
if comparison_type == ComparisonType::Any { | ||
res |= res; | ||
match (arr, sub_arr) { | ||
(Some(arr), Some(sub_arr)) => { | ||
let arr_values = converter.convert_columns(&[arr])?; | ||
let sub_arr_values = if comparison_type != ComparisonType::Single { | ||
converter.convert_columns(&[sub_arr])? | ||
} else { | ||
converter.convert_columns(&[element.clone()])? | ||
}; | ||
|
||
let mut res = match comparison_type { | ||
ComparisonType::All => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.all(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Any => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.any(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Single => arr_values | ||
.iter() | ||
.dedup() | ||
.any(|x| x == sub_arr_values.row(row_idx)), | ||
}; | ||
|
||
if comparison_type == ComparisonType::Any { | ||
res |= res; | ||
} | ||
boolean_builder.append_value(res); | ||
} | ||
// respect null input | ||
(_, _) => { | ||
boolean_builder.append_null(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
|
||
boolean_builder.append_value(res); | ||
} | ||
} | ||
Ok(Arc::new(boolean_builder.finish())) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::array::{Scalar, StringArray}; | ||
use arrow::array::{ | ||
make_array, Array, Capacities, MutableArrayData, Scalar, StringArray, | ||
}; | ||
use arrow::datatypes::DataType; | ||
use datafusion_common::cast::{as_map_array, as_struct_array}; | ||
use datafusion_common::{exec_err, ExprSchema, Result, ScalarValue}; | ||
|
@@ -107,29 +109,55 @@ impl ScalarUDFImpl for GetFieldFunc { | |
); | ||
} | ||
}; | ||
|
||
match (array.data_type(), name) { | ||
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { | ||
let map_array = as_map_array(array.as_ref())?; | ||
let key_scalar = Scalar::new(StringArray::from(vec![k.clone()])); | ||
let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; | ||
let entries = arrow::compute::filter(map_array.entries(), &keys)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using filter will reduce the number of input rows to the number of rows that have keys matching the input key. But we want to respect the number of input rows, and give null for any rows not having the matching key There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this If the input is like this (two rows, each three elements)
An expression like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous implememtation map_array.entries() has type of
With the example above, the layout of field "fields" will be a vector of 2 array, where first array is a list of key, and second array is a list of value
with this computation, the result is a boolean aray where "key" = "c"
and thus this operation will reduce the number of rows into
Problem However, let's add a row where the map does not have key "c" in between
map_array.entries() underneath is represented as
and the return result will be
instead of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the result of evaluating
to be:
For example, in duckdb: D create table foo as values (MAP {'a':1, 'b':2, 'c':100}), (MAP{ 'a':1, 'b':2}), (MAP {'a':1, 'b':2, 'c':200});
D select * from foo;
┌───────────────────────┐
│ col0 │
│ map(varchar, integer) │
├───────────────────────┤
│ {a=1, b=2, c=100} │
│ {a=1, b=2} │
│ {a=1, b=2, c=200} │
└───────────────────────┘
D select col0['c'] from foo;
┌───────────┐
│ col0['c'] │
│ int32[] │
├───────────┤
│ [100] │
│ [] │
│ [200] │
└───────────┘ Basically a scalar function has the invarant that each input row produces exactly 1 output row There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the previous implementation will not return null (according to slt https://github.com/apache/datafusion/pull/10148/files#diff-4db02ec06ada20062cbb518eeb713c2643f5a51e89774bdadd9966410baa7d1dR47) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i also explained in this discussion: #10148 (comment) |
||
let entries_struct_array = as_struct_array(entries.as_ref())?; | ||
Ok(ColumnarValue::Array(entries_struct_array.column(1).clone())) | ||
} | ||
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { | ||
let as_struct_array = as_struct_array(&array)?; | ||
match as_struct_array.column_by_name(k) { | ||
None => exec_err!( | ||
"get indexed field {k} not found in struct"), | ||
Some(col) => Ok(ColumnarValue::Array(col.clone())) | ||
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { | ||
let map_array = as_map_array(array.as_ref())?; | ||
let key_scalar: Scalar<arrow::array::GenericByteArray<arrow::datatypes::GenericStringType<i32>>> = Scalar::new(StringArray::from(vec![k.clone()])); | ||
let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; | ||
|
||
// note that this array has more entries than the expected output/input size | ||
// because maparray is flatten | ||
let original_data = map_array.entries().column(1).to_data(); | ||
let capacity = Capacities::Array(original_data.len()); | ||
let mut mutable = | ||
MutableArrayData::with_capacities(vec![&original_data], true, | ||
capacity); | ||
|
||
for entry in 0..map_array.len(){ | ||
let start = map_array.value_offsets()[entry] as usize; | ||
let end = map_array.value_offsets()[entry + 1] as usize; | ||
|
||
let maybe_matched = | ||
keys.slice(start, end-start). | ||
iter().enumerate(). | ||
find(|(_, t)| t.unwrap()); | ||
if maybe_matched.is_none(){ | ||
mutable.extend_nulls(1); | ||
continue | ||
} | ||
let (match_offset,_) = maybe_matched.unwrap(); | ||
mutable.extend(0, start + match_offset, start + match_offset + 1); | ||
} | ||
let data = mutable.freeze(); | ||
let data = make_array(data); | ||
Ok(ColumnarValue::Array(data)) | ||
} | ||
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { | ||
let as_struct_array = as_struct_array(&array)?; | ||
match as_struct_array.column_by_name(k) { | ||
None => exec_err!("get indexed field {k} not found in struct"), | ||
Some(col) => Ok(ColumnarValue::Array(col.clone())), | ||
} | ||
(DataType::Struct(_), name) => exec_err!( | ||
"get indexed field is only possible on struct with utf8 indexes. \ | ||
Tried with {name:?} index"), | ||
(dt, name) => exec_err!( | ||
"get indexed field is only possible on lists with int64 indexes or struct \ | ||
with utf8 indexes. Tried {dt:?} with {name:?} index"), | ||
} | ||
(DataType::Struct(_), name) => exec_err!( | ||
"get indexed field is only possible on struct with utf8 indexes. \ | ||
Tried with {name:?} index" | ||
), | ||
(dt, name) => exec_err!( | ||
"get indexed field is only possible on lists with int64 indexes or struct \ | ||
with utf8 indexes. Tried {dt:?} with {name:?} index" | ||
), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,7 +160,22 @@ impl PhysicalExpr for ScalarFunctionExpr { | |
|
||
// evaluate the function | ||
match self.fun { | ||
ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs), | ||
ScalarFunctionDefinition::UDF(ref fun) => { | ||
let output = fun.invoke(&inputs)?; | ||
// Only arrow_typeof can bypass this rule | ||
if fun.name() != "arrow_typeof" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should introduce There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added to the trait and the wrapper |
||
let output_size = match &output { | ||
ColumnarValue::Array(array) => array.len(), | ||
ColumnarValue::Scalar(_) => 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this logic is correct -- specifically Thus I think if the function returns Therefore I think the check could be something like if let ColumnarValue::Array(array) = &output {
if output_size != array.len() {
return internal_err...
}
} I'll try and make a PR to improve the documentation on ColumnarValue to make this clearer: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yes, so we don't need to check the length if the output is a scalar value, we actually had a discussion here: #10148 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is a PR to improve the docs #10265 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh then perhaps the current implementation of arrow_typeof can be wrong, if we have a map like this:
then arrow_typeof(map.id) will returns
Is it possible to have this kind of input, because a map's schema can be dynamic per row There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I agree that we can check with Array only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is map key allowed to have different types, which db has the similar model? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha i 'v read duckdb's map definition again
I am new to this kind of project, but i think we don't have and should not have that use case 🤔. Else it will affect alot of execution logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Duckdb, map keys can be different value for each row, but the types should not be different. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
}; | ||
if output_size != batch.num_rows() { | ||
return internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}", | ||
batch.num_rows(), output_size); | ||
} | ||
} | ||
|
||
Ok(output) | ||
} | ||
ScalarFunctionDefinition::Name(_) => { | ||
internal_err!( | ||
"Name function must be resolved to one of the other variants prior to physical planning" | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5169,8 +5169,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test result does not look correct, because it ignore some null rows in between |
||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBB | ||||||||||||||||||||||||
select array_has(arrow_cast(column1, 'LargeList(List(Int64))'), make_array(5, 6)), | ||||||||||||||||||||||||
|
@@ -5183,8 +5184,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I double checked and the datafusion/datafusion/sqllogictest/test_files/array.slt Lines 58 to 68 in cc53bd3
|
||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBB | ||||||||||||||||||||||||
select array_has(column1, make_array(5, 6)), | ||||||||||||||||||||||||
|
@@ -5197,8 +5199,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iikewise I agree this should have 7 output rows datafusion/datafusion/sqllogictest/test_files/array.slt Lines 80 to 90 in cc53bd3
|
||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBBBBBBBBBBB | ||||||||||||||||||||||||
select array_has_all(make_array(1,2,3), make_array(1,3)), | ||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ DELETE 24 | |
query T | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to remind myself what this data looked like. Here it is for anyone else who may be interested DataFusion CLI v37.1.0
> select * from 'datafusion/core/tests/data/parquet_map.parquet';
+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
| ints | strings | timestamp |
+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
| {bytes: 38906} | {host: 198.194.132.41, method: GET, protocol: HTTP/1.0, referer: https://some.com/this/endpoint/prints/money, request: /observability/metrics/production, status: 400, user-identifier: shaneIxD} | 06/Oct/2023:17:53:45 |
| {bytes: 44606} | {host: 140.115.224.194, method: PATCH, protocol: HTTP/1.0, referer: https://we.org/user/booperbot124, request: /booper/bopper/mooper/mopper, status: 304, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 23517} | {host: 63.69.43.67, method: GET, protocol: HTTP/2.0, referer: https://random.net/booper/bopper/mooper/mopper, request: /booper/bopper/mooper/mopper, status: 550, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 44876} | {host: 69.4.253.156, method: PATCH, protocol: HTTP/1.1, referer: https://some.net/booper/bopper/mooper/mopper, request: /user/booperbot124, status: 403, user-identifier: Karimmove} | 06/Oct/2023:17:53:45 |
| {bytes: 34122} | {host: 239.152.196.123, method: DELETE, protocol: HTTP/2.0, referer: https://for.com/observability/metrics/production, request: /apps/deploy, status: 403, user-identifier: meln1ks} | 06/Oct/2023:17:53:45 |
| {bytes: 37438} | {host: 95.243.186.123, method: DELETE, protocol: HTTP/1.1, referer: https://make.de/wp-admin, request: /wp-admin, status: 550, user-identifier: Karimmove} | 06/Oct/2023:17:53:45 |
| {bytes: 45784} | {host: 66.142.251.66, method: PUT, protocol: HTTP/2.0, referer: https://some.org/apps/deploy, request: /secret-info/open-sesame, status: 403, user-identifier: benefritz} | 06/Oct/2023:17:53:45 |
| {bytes: 27788} | {host: 157.85.140.215, method: GET, protocol: HTTP/1.1, referer: https://random.de/booper/bopper/mooper/mopper, request: /booper/bopper/mooper/mopper, status: 401, user-identifier: devankoshal} | 06/Oct/2023:17:53:45 |
| {bytes: 5344} | {host: 62.191.179.3, method: POST, protocol: HTTP/1.0, referer: https://random.org/booper/bopper/mooper/mopper, request: /observability/metrics/production, status: 400, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 9136} | {host: 237.213.221.20, method: PUT, protocol: HTTP/2.0, referer: https://some.us/this/endpoint/prints/money, request: /observability/metrics/production, status: 304, user-identifier: ahmadajmi} | 06/Oct/2023:17:53:46 |
| {bytes: 5640} | {host: 38.148.115.2, method: GET, protocol: HTTP/1.0, referer: https://for.net/apps/deploy, request: /do-not-access/needs-work, status: 301, user-identifier: benefritz} | 06/Oct/2023:17:53:46 |
... |
||
SELECT strings['not_found'] FROM data LIMIT 1; | ||
---- | ||
NULL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not familiar with Map. Why should we return null here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will throws the invalidation error i added in this PR. I think the correct behavior is to return null for every input rows not having the associated key. I took a look at duckdb and spark and they also have this behavior
And also, a similar implementation in Datafusion is array_element also return null if the index goes out of range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is how it works on main: > select strings['not_found'] from 'datafusion/core/tests/data/parquet_map.parquet';
0 row(s) fetched.
Elapsed 0.006 seconds.
Here is how it works on this PR (aka has a single row for each input row) DataFusion CLI v37.1.0
> select strings['not_found'] from '../datafusion/core/tests/data/parquet_map.parquet';
+----------------------------------------------------------------------+
| ../datafusion/core/tests/data/parquet_map.parquet.strings[not_found] |
+----------------------------------------------------------------------+
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| . |
| . |
| . |
+----------------------------------------------------------------------+
209 row(s) fetched. (First 40 displayed. Use --maxrows to adjust)
Elapsed 0.033 seconds. |
||
|
||
statement ok | ||
drop table data; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 -- very nice