-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-32884: [C++] Add ordered aggregation #34311
Conversation
|
@icexelloss, please review the design in this commit. @westonpace, in conflict resolution, I see |
I found the commit you made which removed these files and I generally figured it out. I pushed a resolution of the conflicts. |
See segmented aggregation as a generalization of ordered aggregation (in the PR replaced by this one). |
@rtpsw Can you add some code comment explain the concept, data structure and algorithm? This way the code is more documentation and the reader doesn't need to jump through review/pr links to understand the code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the mutex stuff for now since we expect this to be single threaded. We can put it back in later.
Let's remove the paths handling chunked arrays as they should no longer be needed.
I think we can even make this simpler by removing all use of ExecSpan
in favor of ExecBatch
but we can do that in a follow-up (especially since you didn't introduce this).
I'll take another pass once that's done but I went through it pretty thoroughly today. I think I finally understand everything and it looks like a pretty good approach.
Once we move to a multithreaded approach I think the only thing we will need to serialize on is figuring out the segment boundaries. The way this is setup today that responsibility lies with the segmenter so I think this should be pretty straightforward. We can worry / talk more about that later.
Also, can we add at least one or two basic end-to-end tests in |
My recent commit should have the requested fixes except the above one for adding tests (TBD). |
// values [A, B, A] at row-indices [0, 1, 2]. A regular group-by aggregation with keys [X] | ||
// yields a row-index partitioning [[0, 2], [1]] whereas a segmented-group-by aggregation | ||
// with segment-keys [X] yields [[0], [1], [2]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: This example could be slightly improved I think if you used [A, A, B, A]
so that readers could see that the segmented group by still does segment.
// Handle the input batch | ||
// If a segment is closed by this batch, then we output the aggregation for the segment | ||
// If a segment is not closed by this batch, then we add the batch to the segment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Handle the input batch | |
// If a segment is closed by this batch, then we output the aggregation for the segment | |
// If a segment is not closed by this batch, then we add the batch to the segment | |
// Extract segments from a batch and run the given handler on them. Note that the | |
// handle may be called on open segments which are not yet finished. Typically a | |
// handler should accumulate those open segments until a closed segment is reached. |
// If a segment is closed by this batch, then we output the aggregation for the segment | ||
// If a segment is not closed by this batch, then we add the batch to the segment | ||
template <typename BatchHandler> | ||
Status HandleSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecBatch& batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status HandleSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecBatch& batch, | |
Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch, |
Prefer pointer over mutable reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like some of the comments in here are not addressed. I will create a follow up to track.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm - looks like github diff issue
cpp/src/arrow/compute/exec/options.h
Outdated
/// | ||
/// See also doc in `aggregate_node.cc` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// | |
/// See also doc in `aggregate_node.cc` |
This documentation is for users. I'm not sure we should be directing users to aggregate_node.cc
. Also, it's not clear what doc this is referring to. I think this is fine as it is without the "see also".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtpsw Looks like you missed out on this comment (minor issue)
@westonpace, is this good to go? |
From my perspective, yes. We can merge once @icexelloss approves. |
@rtpsw there are many follow up items from this PR, can you include the list of follows up in PR title so we have at least some ways to track it? If you have a GH issue, please list follow ups here as well. |
@rtpsw I think this is getting close but there are still a number of unresolved thread. Please check those are resolved and ping me when it is ready for me to take another look. |
@icexelloss, I went over the unresolved discussions and commented; I think they can now be resolved. The deferred issues I found are in #34475. |
Thanks @rtpsw I will take a look later Today. Can you gather all the follow up issues and put them as a list in the PR description and the origin GH issue as well? |
|
Linked in both PR and original GH issue to #34475 which has the list. |
Can you put the follow up GH issue link in the list? |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM approved
I also edited the PR description to make the purpose of this PR a bit more clear. |
Benchmark runs are scheduled for baseline = 4c05a3b and contender = 9baefea. 9baefea is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
This PR implements "Segmented Aggregation" to the existing aggregation node to improve aggregation on ordered data.
A segment group is defined as "a continuous chunk of data that have the same segment key value. e.g, if the input data looks like
Then there are three segments
[0, 0, 0]
[1]
[2, 2]
(Note the "group" in "segment group" here is added to differentiate from "segment", which is defined as "a continuous chunk of data with in a ExecBatch")
Segment aggregation can be used to replace existing hash aggregation in the case that data are ordered. The benefit of this is
(1) We can output aggregation result earlier (as soon as a segment group is fully consumed).
(2) We only need to hold partial aggregation for one segment group to reduce memory usage.
See https://issues.apache.org/jira/browse/ARROW-17642
Replaces #14352
Follow ups