Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert ArrayAgg to UDAF #10999

Closed
jayzhan211 opened this issue Jun 19, 2024 · 15 comments
Closed

Convert ArrayAgg to UDAF #10999

jayzhan211 opened this issue Jun 19, 2024 · 15 comments
Labels
enhancement New feature or request

Comments

@jayzhan211
Copy link
Contributor

Is your feature request related to a problem or challenge?

Similar to other issues in #8708

Remember to include test in roundtrip_expr_api

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@eejbyfeldt
Copy link
Contributor

Will work on this one. I think it might also involve moving nth_value since they shared some code.

@eejbyfeldt
Copy link
Contributor

@jayzhan211 I pushed a work in progress here #11029 it still fails some test cases in sqllogictests

External error: query failed: DataFusion error: This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: NTH_VALUE(aggregate_test_100.c4,Int64(3)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
[SQL] SELECT
   NTH_VALUE(c4, 3) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as nth_value1,
   NTH_VALUE(c4, 2) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as nth_value2
   FROM aggregate_test_100
   ORDER BY c9
   LIMIT 5
at test_files/window.slt:1205

External error: query failed: DataFusion error: External error: Arrow error: Invalid argument error: column types must match schema types, expected List(Field { name: "item", data_type: Struct([Field { name: "sn@1", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) but found List(Field { name: "item", data_type: Struct([Field { name: "sn@0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) at column index 2
[SQL] SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
   e.currency_to = 'USD' AND
   s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn;
at test_files/group_by.slt:3181

The first one seems to be because the new NTH_VALUE UDAF is picked over the builtin window function with the same name. Is this expected? What is the correct course of action to resolve it?

The second one looks a bit weird to me, not sure if I messed something up or I hitting some other issue.

@jayzhan211 If you have time to provide some pointers that would be highly appreciated :)

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Jun 20, 2024

I suggest we convert 1. ArrayAgg 2 DistinctArrayAgg, 3. OrderSensitiveArrayAgg and 3. NthValue separately.

OrderSensitiveArrayAgg and NthValue are quite complex.

@jayzhan211
Copy link
Contributor Author

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

We can get nullable with

let nullable = expr.nullable(input_schema)?;

https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr-common/src/aggregate/mod.rs#L275C1-L289C6

@eejbyfeldt
Copy link
Contributor

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.

At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

@eejbyfeldt
Copy link
Contributor

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

@jayzhan211
Copy link
Contributor Author

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

@jayzhan211
Copy link
Contributor Author

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.
At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

We could check the distinct and ordering to know whether we should use builtin or UDAF here

pub fn create_aggregate_expr_with_name_and_maybe_filter(
e: &Expr,
name: impl Into<String>,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<AggregateExprWithOptionalArgs> {
match e {
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter,
order_by,
null_treatment,
}) => {
let physical_args =
create_physical_exprs(args, logical_input_schema, execution_props)?;
let filter = match filter {
Some(e) => Some(create_physical_expr(
e,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;
let (agg_expr, filter, order_by) = match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = aggregates::create_aggregate_expr(
fun,
*distinct,
&physical_args,
&ordering_reqs,
physical_input_schema,
name,
ignore_nulls,
)?;
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::UDF(fun) => {
let sort_exprs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = udaf::create_aggregate_expr(
fun,
&physical_args,
args,
&sort_exprs,
&ordering_reqs,
physical_input_schema,
name,
ignore_nulls,
*distinct,
)?;
(agg_expr, filter, physical_sort_exprs)
}
};
Ok((agg_expr, filter, order_by))
}
other => internal_err!("Invalid aggregate expression '{other:?}'"),
}
}

The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?
We can move to physical_expr_common first, after all the related function is done, then move it back.

@eejbyfeldt
Copy link
Contributor

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

What about covariance: https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/covariance.rs#L43 that takes 2 arguments.

@eejbyfeldt
Copy link
Contributor

eejbyfeldt commented Jun 21, 2024

@jayzhan211 Created a PR for only doning ArrayAgg here #11045 will look into adding nullable next.

@findepi
Copy link
Member

findepi commented Jul 5, 2024

array_agg is known to produce non-null result.
if we make it an UDAF, to avoid regression (if we choose so), we need a way for an UDAF to mark its output as non-null: #11274

@jayzhan211
Copy link
Contributor Author

@eejbyfeldt Dp you plan to work on array_agg?

@eejbyfeldt
Copy link
Contributor

eejbyfeldt commented Jul 13, 2024

@jayzhan211 I will not be able to work on it for the two weeks due to being on vacation. So, someone else should feel to pick it up/take it over before then.

@eejbyfeldt
Copy link
Contributor

@jayzhan211 Should this be closed. Seems like it was resolved with #11448 ?

@jayzhan211
Copy link
Contributor Author

Yes, all the functions are converted

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants