-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-17642: [C++] Add ordered aggregation #14352
Conversation
This PR implements a generalization of ordered aggregation called segmented aggregation, in which the segment-keys split the input rows into segments of consecutive rows with common values for these segment-keys. In other words, the extra keys are not required to be ordered but only to change from one segment to the next. I opted to generalize this way because the extra implementation effort is quite small and the result is useful. |
Note that the current implementation does not support streaming. In particular, the segment-aggregation operation produces the entire output after processing the entire input. However, the current implementation gets close enough to allow a future PR to add support for streaming without too much effort. cc @icexelloss |
Thanks @rtpsw! Will try to take a look |
@lidavidm, are you a good person to review this? or can you suggest someone? Also, note this PR includes a new SWMR gadget which may be of wider interest and could be upgraded to a visible internal API, WDYT? |
Weston would probably be best, but he's going to be busy with release things. Do you need a review soon? The link to the gadget is dead, so I'm not sure what you're referencing |
Personally, I can hold. @icexelloss should answer if he needs this sooner.
Sorry about that. Please see |
Turns out the link is OK but GitHub avoids scrolling it into view when the containing file has a large diff. When I click |
Notes about the recent commit:
It would be good to get feedback on the approach for this PR even before I add further tests. |
@lidavidm We do not need this soon. This can wait a little bit (2 weeks or so) |
Pushed a change I had in flight. Holding for now. |
To be fair, there are plenty of possible mutex policies (example discussion). I think the first question is whether the usefulness to Arrow of the proposed policy and its implementation are worth a (separate) discussion. |
@lidavidm, it may be a good time to get back to this. cc @icexelloss |
Yes, sorry. I'll see if I can get to this soon. Or CC @westonpace. |
return -1; | ||
} | ||
|
||
bool HasConsistentChunks(const ExecBatch& batch, int index_of_chunk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily an issue here, but maybe something like MultipleChunkIterator would let us avoid having to do this check?
arrow/cpp/src/arrow/chunked_array.h
Lines 198 to 237 in 5889c78
/// \brief EXPERIMENTAL: Utility for incremental iteration over contiguous | |
/// pieces of potentially differently-chunked ChunkedArray objects | |
class ARROW_EXPORT MultipleChunkIterator { | |
public: | |
MultipleChunkIterator(const ChunkedArray& left, const ChunkedArray& right) | |
: left_(left), | |
right_(right), | |
pos_(0), | |
length_(left.length()), | |
chunk_idx_left_(0), | |
chunk_idx_right_(0), | |
chunk_pos_left_(0), | |
chunk_pos_right_(0) {} | |
bool Next(std::shared_ptr<Array>* next_left, std::shared_ptr<Array>* next_right); | |
int64_t position() const { return pos_; } | |
private: | |
const ChunkedArray& left_; | |
const ChunkedArray& right_; | |
// The amount of the entire ChunkedArray consumed | |
int64_t pos_; | |
// Length of the chunked array(s) | |
int64_t length_; | |
// Current left chunk | |
int chunk_idx_left_; | |
// Current right chunk | |
int chunk_idx_right_; | |
// Offset into the current left chunk | |
int64_t chunk_pos_left_; | |
// Offset into the current right chunk | |
int64_t chunk_pos_right_; | |
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for requiring consistent chunks, which is defined as having the same sequence of chunk lengths for each (chunked-array) value in the batch, is that it ensures we can create an ExecSpan
without fragmenting the chunks. For example, if we had one value with chunks lengths [3, 5] and a second value with chunk lengths [2, 6] then we'd need to fragment into 3 ExecSpan
of lengths [2, 1, 5] since they each must cover rows from one chunk. Of course, the fragmentation could easily get much worse with many values having inconsistent chunks, and the performance cost of fragmentation would be annoying. So, while the requirement can be lifted using something like MultipleChunkIterator
, I'm not sure it is worth it. Let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only managed a cursory glance so far and I would like to take a closer look and will try to get a chance next week. At a high level I think I have a few worries:
- It appears you have made some changes to allow exec batches to contain chunked arrays. However, I do not think an exec batch should ever need to contain a chunked array. If a grouping node wants to deal with some chunked arrays internally I think that is ok but it shouldn't put that complexity on the rest of the nodes.
- I don't think we really want to maintain the
GroupBy
methods inaggregate.h
. They do things slightly different than an ExecPlan and having two subtly different implementations is going to be a headache. I'd rather convert the python bindings to use an ExecPlan.
} | ||
|
||
const char* kind_name() const override { return "ScalarAggregateNode"; } | ||
|
||
Status DoConsume(const ExecSpan& batch, size_t thread_index) { | ||
GatedSharedLock lock(gated_shared_mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this lock needed? Each batch of incoming data should be more or less separably processable right? Or is this to handle the case where a segment spreads out across multiple batches? Ideally we should not be holding any locks when we make kernel functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without these locks, I observed a race condition between the finishing of one segment, which reads accumulated states, and the consuming of the next segment, which accumulates states. The GatedSharedLock
in the code ensure that the (multi-threaded) consuming of the next segment does not run until the finishing of the current segment completes. I can think of ways to reduce the locking, but they would be more complex and are better left for a future task.
There's some discussion here, I should have pinged you at the time: https://issues.apache.org/jira/browse/ARROW-17965 |
Thanks for all the comments. I'll try to post some quick responses, but I'll need to get back to this later, due to other priorities. |
No worries! I've been pretty busy these last few weeks so I don't know that I would have caught it anyways. |
This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file? |
My investigation suggests that the reason for introducing handling of chunked arrays in the first place is that the testers use tables, and their implementing class |
Does the non ordered / hash aggregation handles chunk array?
…On Sun, Nov 27, 2022 at 4:07 PM rtpsw ***@***.***> wrote:
I'm still pretty reluctant to add code to handle chunked arrays. I feel it
adds complexity that we will end up maintaining when chunked arrays don't
really have a place in a streaming execution engine (since we process
things once batch at a time usually).
This is understandable. I'll try to drop support for chunked arrays in
this PR and report back on what seems to break; we may be able to find an
alternative approach.
My investigation suggests that the reason for introducing chunks in the
first place is that the testers use tables, and their implementing class
SimpleTable has ChunkedArray columns (even after CombineChunks) that the
aggregation code needs to handle. Therefore, if we remove support for
chunked arrays in the aggregation code, then it won't work nicely with
table inputs. AFAIU, aggregating tables is a valid use case that should be
supported. @westonpace <https://github.com/westonpace>, let me know you
thoughts.
—
Reply to this email directly, view it on GitHub
<#14352 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAGBXLDTXW4NCU2VWOULFW3WKN2M5ANCNFSM6AAAAAARANAUMQ>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
AFAICS, yes, There are plenty of pre-PR |
I see the same failure for the latest commit, so same questions. |
I followed up on the job's workflow and found the command
|
Aggregation of tables should be done by using a table source node and an exec plan. Not by using the
|
IIUC, support of chunked arrays is needed only for |
I have created #14867 which removes the internal GroupBy facility entirely (well, remaps it onto exec plans). |
@westonpace, I'm confused about the next step here. Should this PR hold until yours is pushed and then merge it in? |
I would like to take a deeper look at this before merging (sorry haven't been able to do it yet) |
Unfortunately, I think that would be the easiest path. Otherwise we risk introducing a bunch of complexity that we don't need and just have to pull out later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you're testing a group by with the exec plan using segment keys but I might be missing something. So I don't think the changes in aggregate_node.cc are tested.
@@ -180,7 +180,7 @@ struct ARROW_EXPORT ExecBatch { | |||
|
|||
explicit ExecBatch(const RecordBatch& batch); | |||
|
|||
static Result<ExecBatch> Make(std::vector<Datum> values); | |||
static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naively one looking at this might be confused why you now need both ExecBatch::Make
and ExecBatch::ExecBatch
since both take a vector of values and a length.
Looking closer it seems the Make
function does the extra work of verifying that the length of the datums match the given length.
Could you add some comments explaining this for future readers?
cpp/src/arrow/compute/exec.h
Outdated
std::vector<Datum> selected_values(ids.size()); | ||
for (size_t i = 0; i < ids.size(); i++) { | ||
if (ids[i] < 0 || static_cast<size_t>(ids[i]) >= values.size()) { | ||
return Status::Invalid("ExecBatch invalid value selection: ", ids[i]); | ||
} | ||
selected_values[i] = values[ids[i]]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::vector<Datum> selected_values(ids.size()); | |
for (size_t i = 0; i < ids.size(); i++) { | |
if (ids[i] < 0 || static_cast<size_t>(ids[i]) >= values.size()) { | |
return Status::Invalid("ExecBatch invalid value selection: ", ids[i]); | |
} | |
selected_values[i] = values[ids[i]]; | |
} | |
std::vector<Datum> selected_values; | |
selected_values.reserve(ids.size()); | |
for (int idx : ids) { | |
if (idx < 0 || idx >= static_cast<int>(values.size())) { | |
return Status::Invalid("ExecBatch invalid value selection: ", idx); | |
} | |
selected_values.push_back(values[idx]); | |
} |
Minor nit: this seems slightly more readable with a foreach loop.
cpp/src/arrow/compute/exec.h
Outdated
@@ -233,6 +233,17 @@ struct ARROW_EXPORT ExecBatch { | |||
|
|||
ExecBatch Slice(int64_t offset, int64_t length) const; | |||
|
|||
Result<ExecBatch> SelectValues(const std::vector<int>& ids) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know there aren't many comment blocks in this file but ExecBatch
is less experimental than it used to be. Can you comment this method?
Also, is there any particular reason to have the implementation in the header file?
/// Currently only uint32 indices will be produced, eventually the bit width will only | ||
/// be as wide as necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Currently only uint32 indices will be produced, eventually the bit width will only | |
/// be as wide as necessary. |
We can save this for a future PR or put it into a TODO but speculation of future issues should be left out of ///
comment blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from an existing comment of a method by the same name. I don't mind removing it if you still think it should be.
@@ -44,7 +45,405 @@ namespace compute { | |||
|
|||
namespace { | |||
|
|||
struct GrouperImpl : Grouper { | |||
inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) { | |||
int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a DCHECK
here to ensure that data.type->byte_width() > 0
(e.g. to ensure that this is a fixed-width data type)
base += keys.size(); | ||
for (size_t i = 0; i < segment_keys.size(); ++i) { | ||
int segment_key_field_id = segment_key_field_ids[i]; | ||
output_fields[base + i] = input_schema->field(segment_key_field_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think output_fields
is large enough to do output_fields[base + i]
. I get a segmentation fault here.
Are the segment keys part of the output? I would expect them to be. However, I also don't see any place in GroupByNode::Finalize that is putting the segment keys into the output data.
This looks like an oversight. Sorry about that. I'll work on this. |
Replaced by #34311 |
This PR implements "Segmented Aggregation" to the existing aggregation node to improve aggregation on ordered data. A segment group is defined as "a continuous chunk of data that have the same segment key value. e.g, if the input data looks like ``` [0, 0, 0, 1, 2, 2] ``` Then there are three segments `[0, 0, 0]` `[1]` `[2, 2]` (Note the "group" in "segment group" here is added to differentiate from "segment", which is defined as "a continuous chunk of data with in a ExecBatch") Segment aggregation can be used to replace existing hash aggregation in the case that data are ordered. The benefit of this is (1) We can output aggregation result earlier (as soon as a segment group is fully consumed). (2) We only need to hold partial aggregation for one segment group to reduce memory usage. See https://issues.apache.org/jira/browse/ARROW-17642 Replaces #14352 * Closes: #32884 Follow ups ======= * #34475 * #34529 --------- Co-authored-by: Li Jin <[email protected]>
See https://issues.apache.org/jira/browse/ARROW-17642