Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32884: [C++] Add ordered aggregation #34311

Merged
merged 30 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b2b10cf
GH-32884: [C++] Add ordered aggregation
rtpsw Feb 23, 2023
636701e
Merge branch 'main' into GH-32884
rtpsw Feb 23, 2023
70acbe6
requested fixes
rtpsw Feb 24, 2023
78af85e
fix integer conversion
rtpsw Feb 24, 2023
9627ddf
simplify
rtpsw Feb 25, 2023
6240596
remove unused
rtpsw Feb 25, 2023
01a946b
requested fixes
rtpsw Mar 1, 2023
54a5809
Simply some segment code; add documentation; some refactor/renames
icexelloss Mar 2, 2023
e96ac77
Rename segment piece back to segment
icexelloss Mar 3, 2023
1c4579b
Format change
icexelloss Mar 3, 2023
192dc87
Fix comment w.r.t segment/segment group
icexelloss Mar 3, 2023
c92f8d0
Fix docstring for segment
icexelloss Mar 3, 2023
5ac8c65
Merge pull request #1 from icexelloss/GH-32884-ordered-agg
rtpsw Mar 3, 2023
111a7b9
add plan tests
rtpsw Mar 4, 2023
d85dac3
requested changes
rtpsw Mar 4, 2023
28fce61
reinstate state resize
rtpsw Mar 4, 2023
218e04c
add tests and fix
rtpsw Mar 5, 2023
4f18abd
fix vector access
rtpsw Mar 5, 2023
a633038
requested changes
rtpsw Mar 6, 2023
e52439c
Minor doc/rename changes
icexelloss Mar 6, 2023
7edcb18
fix typo
icexelloss Mar 6, 2023
48d39eb
requested changes
rtpsw Mar 7, 2023
967a07c
add export
rtpsw Mar 7, 2023
d76323c
Fix docstring
icexelloss Mar 7, 2023
7b6e953
Merge pull request #2 from icexelloss/GH-32884-ordered-agg
rtpsw Mar 7, 2023
5e8cbc9
requested changes
rtpsw Mar 7, 2023
d68efd4
requested changes, additional tests
rtpsw Mar 7, 2023
b11a6ca
requested changes
rtpsw Mar 8, 2023
9e094fc
doc and simplify segmenter grouping
rtpsw Mar 8, 2023
6295691
requested changes
rtpsw Mar 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
return out;
}

Result<ExecBatch> ExecBatch::SelectValues(const std::vector<int>& ids) const {
std::vector<Datum> selected_values;
selected_values.reserve(ids.size());
for (int id : ids) {
if (id < 0 || static_cast<size_t>(id) >= values.size()) {
return Status::Invalid("ExecBatch invalid value selection: ", id);
}
selected_values.push_back(values[id]);
}
return ExecBatch(std::move(selected_values), length);
}

namespace {

enum LengthInferenceError {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ struct ARROW_EXPORT ExecBatch {
/// \brief Infer the ExecBatch length from values.
static Result<int64_t> InferLength(const std::vector<Datum>& values);

/// Creates an ExecBatch with length-validation.
///
/// If any value is given, then all values must have a common length. If the given
/// length is negative, then the length of the ExecBatch is set to this common length,
/// or to 1 if no values are given. Otherwise, the given length must equal the common
/// length, if any value is given.
static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1);

Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
Expand Down Expand Up @@ -240,6 +246,8 @@ struct ARROW_EXPORT ExecBatch {

ExecBatch Slice(int64_t offset, int64_t length) const;

Result<ExecBatch> SelectValues(const std::vector<int>& ids) const;
icexelloss marked this conversation as resolved.
Show resolved Hide resolved

/// \brief A convenience for returning the types from the batch.
std::vector<TypeHolder> GetTypes() const {
std::vector<TypeHolder> result;
Expand Down
309 changes: 276 additions & 33 deletions cpp/src/arrow/compute/exec/aggregate_node.cc

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ class ARROW_EXPORT ExecNode {
/// concurrently, potentially even before the call to StartProducing
/// has finished.
/// - PauseProducing(), ResumeProducing(), StopProducing() may be called
/// by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
/// methods
/// by the downstream nodes' InputReceived(), InputFinished() methods
///
/// StopProducing may be called due to an error, by the user (e.g. cancel), or
/// because a node has all the data it needs (e.g. limit, top-k on sorted data).
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,37 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
std::vector<std::string> names;
};

/// \brief Make a node which aggregates input batches, optionally grouped by keys.
/// \brief Make a node which aggregates input batches, optionally grouped by keys and
/// optionally segmented by segment-keys. Both keys and segment-keys determine the group.
/// However segment-keys are also used for determining grouping segments, which should be
/// large, and allow streaming a partial aggregation result after processing each segment.
westonpace marked this conversation as resolved.
Show resolved Hide resolved
/// One common use-case for segment-keys is ordered aggregation, in which the segment-key
/// attribute specifies a column with non-decreasing values or a lexicographically-ordered
/// set of such columns.
///
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
/// then each aggregate is assumed to be a ScalarAggregate function.
///
/// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
/// described above, applies.
///
/// The keys and segment_keys vectors must be disjoint.
class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
public:
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys = {})
: aggregates(std::move(aggregates)), keys(std::move(keys)) {}
std::vector<FieldRef> keys = {},
std::vector<FieldRef> segment_keys = {})
: aggregates(std::move(aggregates)),
keys(std::move(keys)),
segment_keys(std::move(segment_keys)) {}

// aggregations which will be applied to the targetted fields
std::vector<Aggregate> aggregates;
// keys by which aggregations will be grouped
// keys by which aggregations will be grouped (optional)
std::vector<FieldRef> keys;
// keys by which aggregations will be segmented (optional)
std::vector<FieldRef> segment_keys;
westonpace marked this conversation as resolved.
Show resolved Hide resolved
};

constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB
Expand Down
103 changes: 103 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1476,5 +1476,108 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}
}

TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
BatchesWithSchema data;
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
data.schema = schema({field("i32", int32())});
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"count", nullptr, "i32", "count(i32)"},
},
/*keys=*/{"i32"}, /*segment_leys=*/{"i32"}}}});
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("multi-threaded"),
DeclarationToExecBatches(std::move(plan)));
}

TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[1, 2, 2], [1, 1, 3], [1, 2, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
R"([[6, 2, 1, 1], [6, 2, 2, 1]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[2, 2, 2], [2, 1, 3], [2, 2, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON(
{int64(), float64(), int32(), int32()},
R"([[3, 1.5, 1, 1], [1, 1, 2, 1], [3, 3, 1, 2], [5, 2.5, 2, 2]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 1, 1], [2, 2, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[2, 2, 2], [3, 3, 3], [3, 3, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
R"([[2, 1, 1, 1], [4, 2, 2, 2], [6, 3, 3, 3]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

} // namespace compute
} // namespace arrow
Loading