-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-35515: [C++][Python] Add non decomposable aggregation UDF #35514
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format?
or
In the case of PARQUET issues on JIRA the title also supports:
See also: |
|
0dd8e25
to
2b38a2f
Compare
@@ -65,6 +70,26 @@ struct PythonUdfKernelInit { | |||
std::shared_ptr<OwnedRefNoGIL> function; | |||
}; | |||
|
|||
struct ScalarUdfAggregator : public compute::KernelState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Scalar" as supposed to the "grouped" aggregator which has difference interface:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/hash_aggregate.cc#L66
d5e63df
to
dc1d734
Compare
7ba9cc8
to
1203346
Compare
6044f20
to
17ff274
Compare
@westonpace I believe this PR is good to go. The failed CI seems unrelated. I have checked the Py refcount and it seems OK (I will add details in the comment thread above) |
@icexelloss I'll take another look through today. |
Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more very minor suggestions but, overall, I think this is fine.
"x": pa.int64(), | ||
"y": pa.float64() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so the test case is verifying that the python function can take in *args
if needed (even though it still lists the args when registering)?
std::vector<std::shared_ptr<DataType>> input_types, | ||
std::shared_ptr<DataType> output_type) | ||
: agg_cb(agg_cb), agg_function(agg_function), output_type(output_type) { | ||
Py_INCREF(agg_function->obj()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This increment seems redundant given you already have one here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Admitted there could be some redundancy here. I created an follow up to take a closer look:
Co-authored-by: Weston Pace <[email protected]>
I checked failed CI jobs and those seem unrelated. |
Benchmark runs are scheduled for baseline = e920bed and contender = 8b5919d. 8b5919d is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Rationale for this change
Non decomposable aggregation is aggregation that cannot be split into consume/merge/finalize. This is often when the logic rewritten with external python libraries (numpy, pandas, statmodels, etc) and those either cannot be decomposed or not worthy the effect (these are often one-off function instead of reusable one). This PR implements the support for non decomposable aggregation UDFs.
The major issue with non decomposable UDF is that the UDF needs to see all data at once, unlike scalar UDF where UDF only needs to see a batch at a time. This makes non decomposable not so useful as it is same as collect all the data to a pd.DataFrame and apply the UDF on it. However, one very application of non decomposable UDF is with segmented aggregation. To refresh, segmented aggregation works on ordered data and passed one logic chunk at a time (e.g., all data with the same date). With segmented aggregation and non decomposable aggregation UDF, the user can apply any custom aggregation logic over large stream of ordered data, with the memory overhead of a single segment.
What changes are included in this PR?
This PR is currently WIP and not ready for review.
So far I have implemented the minimal amount of code to make a basic test working but needs clean up, error handling etc.
Are these changes tested?
Added new test calling with compute and acero.
The compute tests calls the aggregation on the full array. The acero test callings the aggregation with segmented aggregation.
Are there any user-facing changes?