Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor doc/rename changes #2

Merged
merged 3 commits into from
Mar 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,13 @@ Status HandleSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecBatch&
return Status::OK();
}

Status SelectConstantFields(std::vector<Datum>* values_ptr, const ExecBatch& input_batch,
const std::vector<int>& field_ids) {
/// @brief Extract values of segment keys from a segment batch
/// @param[out] values_ptr Vector to store the extracted segment key values
/// @param[in] input_batch Segment batch. Must have the a constant value for segment key
/// @param[in] field_ids Segment key field ids
Status ExtractSegmenterValues(std::vector<Datum>* values_ptr,
const ExecBatch& input_batch,
const std::vector<int>& field_ids) {
DCHECK_GT(input_batch.length, 0);
std::vector<Datum>& values = *values_ptr;
int64_t row = input_batch.length - 1;
Expand Down Expand Up @@ -399,7 +404,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index));
RETURN_NOT_OK(
SelectConstantFields(&segmenter_values_, exec_batch, segment_field_ids_));
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_));

// If the segment closes the current segment group, we can output segment group
// aggregation.
Expand Down Expand Up @@ -490,11 +495,12 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
return Status::OK();
}

/// \brief A segmenter for the segment-keys
// A segmenter for the segment-keys
std::unique_ptr<RowSegmenter> segmenter_;
/// \brief Field indices corresponding to the segment-keys
// Field indices corresponding to the segment-keys
const std::vector<int> segment_field_ids_;
/// \brief Holds values of the current batch that were selected for the segment-keys
// Holds the value of segment keys of the most recent input batch
// The values are updated everytime an input batch is processed
std::vector<Datum> segmenter_values_;

const std::vector<std::vector<int>> target_fieldsets_;
Expand Down Expand Up @@ -811,11 +817,13 @@ class GroupByNode : public ExecNode, public TracedNode {

auto handler = [this](const ExecBatch& full_batch, const Segment& segment) {
if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
// This is not zero copy - we should refactor the code to pass
// offset and length to Consume to avoid copying here
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
auto batch = ExecSpan(exec_batch);
Comment on lines +820 to 823
Copy link
Owner

@rtpsw rtpsw Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right this is not zero-copy, but I believe this can be fixed within this block of code. Let's write something like "TODO: fix this to be zero-copy".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed my post above because the fix I have in mind involves adding ExecSpan::Slice, which would require a bit of testing that I'd prefer to defer on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

RETURN_NOT_OK(Consume(batch));
RETURN_NOT_OK(
SelectConstantFields(&segmenter_values_, exec_batch, segment_key_field_ids_));
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
return Status::OK();
};
Expand Down