Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44052: [C++][Compute] Reduce the complexity of row segmenter #44053

Merged
merged 17 commits into from
Sep 18, 2024
84 changes: 75 additions & 9 deletions cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
Expand All @@ -34,6 +36,9 @@

namespace arrow {

using arrow::Concatenate;
using arrow::ConstantArrayGenerator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

using arrow::gen::Constant;
using compute::Count;
using compute::MinMax;
using compute::Mode;
Expand Down Expand Up @@ -325,7 +330,8 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll<int64_t>)

std::shared_ptr<RecordBatch> RecordBatchFromArrays(
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys) {
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<Array>> all_arrays;
int64_t length = -1;
Expand All @@ -347,35 +353,53 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
fields.push_back(field("key" + ToChars(key_idx), key->type()));
all_arrays.push_back(key);
}
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
const auto& segment_key = segment_keys[segment_key_idx];
DCHECK_EQ(segment_key->length(), length);
fields.push_back(
field("segment_key" + ToChars(segment_key_idx), segment_key->type()));
all_arrays.push_back(segment_key);
}
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys,
bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys),
std::move(segment_keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
std::shared_ptr<RecordBatch> batch = RecordBatchFromArrays(arguments, keys);
static void BenchmarkGroupBy(
benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys = {}) {
std::shared_ptr<RecordBatch> batch =
RecordBatchFromArrays(arguments, keys, segment_keys);
std::vector<FieldRef> key_refs;
for (std::size_t key_idx = 0; key_idx < keys.size(); key_idx++) {
key_refs.emplace_back(static_cast<int>(key_idx + arguments.size()));
}
std::vector<FieldRef> segment_key_refs;
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
segment_key_refs.emplace_back(
static_cast<int>(segment_key_idx + arguments.size() + keys.size()));
}
for (std::size_t arg_idx = 0; arg_idx < arguments.size(); arg_idx++) {
aggregates[arg_idx].target = {FieldRef(static_cast<int>(arg_idx))};
}
int64_t total_bytes = TotalBufferSize(*batch);
for (auto _ : state) {
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs));
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs));
}
state.SetBytesProcessed(total_bytes * state.iterations());
}
Expand Down Expand Up @@ -866,5 +890,47 @@ BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs);

//
// RowSegmenter
//

template <typename... Args>
static void BenchmarkRowSegmenter(benchmark::State& state, Args&&...) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we turn this into a more realistic GroupBy benchmark? For example:

  • M=0 to 1 non-segmented keys
  • N=1 to 2 segmented keys
  • K=key cardinality (2,8,64)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do, and the name should be named with GroupBy for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

int64_t num_rows = state.range(0);
int64_t num_segments = state.range(1);
ASSERT_NE(num_segments, 0);
ASSERT_GE(num_rows, num_segments);
int64_t num_segment_keys = state.range(2);
// Adjust num_rows to be a multiple of num_segments.
num_rows = num_rows / num_segments * num_segments;

// A trivial column to count from.
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int64());
// num_segments segments, each having identical num_rows / num_segments rows of the
// associated segment id.
ArrayVector segments(num_segments);
for (int i = 0; i < num_segments; ++i) {
ASSERT_OK_AND_ASSIGN(
segments[i],
Constant(std::make_shared<Int64Scalar>(i))->Generate(num_rows / num_segments));
}
// Concat all segments to form the segment key.
ASSERT_OK_AND_ASSIGN(auto segment_key, Concatenate(segments));
// num_segment_keys copies of the segment key.
ArrayVector segment_keys(num_segment_keys, segment_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less trivially, you can, for each segmented key, 1) generate a random Int64 array of the right cardinality (using the min and max values) 2) sort it to make it segmented. Then the multi-column segments will not trivially map to the individual key segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the multi-column segments will not trivially map to the individual key segments.

Sorry, I don't follow this part :(

Could you elaborate a bit? Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that you have the exact same segment lengths in each segment key column. By using random generation the patterns would be slightly different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mean introducing some randomness into the individual segment lengths. That sounds good given that this particular benchmark needs to be more realistic.

Will do. Thank you for elaborating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


BenchmarkGroupBy(state, {{"count", ""}}, {arg}, /*keys=*/{}, segment_keys);

state.SetItemsProcessed(num_rows * state.iterations());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we call SetItemsProcessed in BenchmarkGroupBy instead? It should probably be relevant for the other GroupBy benchmarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they should.

Moved into BenchmarkGroupBy. Other sharing benchmarks will have items_per_second metrics now.

}

std::vector<std::string> row_segmenter_argnames = {"Rows", "Segments", "SegmentKeys"};
std::vector<std::vector<int64_t>> row_segmenter_args = {
{32 * 1024}, benchmark::CreateRange(1, 256, 4), benchmark::CreateDenseRange(0, 3, 1)};

BENCHMARK(BenchmarkRowSegmenter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a GroupBy benchmark, so should probably have "GroupBy" in its name like other benchmarks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

->ArgNames(row_segmenter_argnames)
->ArgsProduct(row_segmenter_args);

} // namespace acero
} // namespace arrow
9 changes: 3 additions & 6 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,14 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
template <typename BatchHandler>
Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch,
const std::vector<int>& ids, const BatchHandler& handle_batch) {
int64_t offset = 0;
ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
ExecSpan segment_batch(segment_exec_batch);

while (true) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only call-site in non-testing codes.

ARROW_ASSIGN_OR_RAISE(compute::Segment segment,
segmenter->GetNextSegment(segment_batch, offset));
if (segment.offset >= segment_batch.length) break; // condition of no-next-segment
ARROW_ASSIGN_OR_RAISE(auto segments, segmenter->GetSegments(segment_batch));
for (const auto& segment : segments) {
ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
offset = segment.offset + segment.length;
}

return Status::OK();
}

Expand Down
66 changes: 20 additions & 46 deletions cpp/src/arrow/acero/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,12 @@ void TestGroupClassSupportedKeys(

void TestSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecSpan& batch,
std::vector<Segment> expected_segments) {
int64_t offset = 0, segment_num = 0;
for (auto expected_segment : expected_segments) {
SCOPED_TRACE("segment #" + ToChars(segment_num++));
ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_EQ(expected_segment, segment);
offset = segment.offset + segment.length;
ASSERT_OK_AND_ASSIGN(auto actual_segments, segmenter->GetSegments(batch));
ASSERT_EQ(actual_segments.size(), expected_segments.size());
for (size_t i = 0; i < actual_segments.size(); ++i) {
SCOPED_TRACE("segment #" + ToChars(i));
ASSERT_EQ(actual_segments[i], expected_segments[i]);
}
// Assert next is the last (empty) segment.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new API generates fewer segments than before - there is no meaningless (length == 0) tailing segment.

ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_GE(segment.offset, batch.length);
ASSERT_EQ(segment.length, 0);
ASSERT_TRUE(segment.is_open);
ASSERT_TRUE(segment.extends);
}

Result<std::unique_ptr<Grouper>> MakeGrouper(const std::vector<TypeHolder>& key_types) {
Expand Down Expand Up @@ -629,61 +622,47 @@ TEST(RowSegmenter, Basics) {
auto batch2 = ExecBatchFromJSON(types2, "[[1, 1], [1, 2], [2, 2]]");
auto batch1 = ExecBatchFromJSON(types1, "[[1], [1], [2]]");
ExecBatch batch0({}, 3);
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new API doesn't accept offset argument so no need for such testing.

SCOPED_TRACE("offset");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types0));
ExecSpan span0(batch0);
for (int64_t offset : {-1, 4}) {
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
HasSubstr("invalid grouping segmenter offset"),
segmenter->GetNextSegment(span0, offset));
}
}
{
SCOPED_TRACE("types0 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types0));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 0 "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
ExecSpan span0(batch0);
TestSegments(segmenter, span0, {{0, 3, true, true}, {3, 0, true, true}});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below are just because the new API doesn't generate the zero-length ending segment.

TestSegments(segmenter, span0, {{0, 3, true, true}});
}
{
SCOPED_TRACE("bad_types1 segmenting of batch1");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types1));
ExecSpan span1(batch1);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 0 of type "),
segmenter->GetNextSegment(span1, 0));
segmenter->GetSegments(span1));
}
{
SCOPED_TRACE("types1 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types1));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 1 "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
ExecSpan span1(batch1);
TestSegments(segmenter, span1,
{{0, 2, false, true}, {2, 1, true, false}, {3, 0, true, true}});
TestSegments(segmenter, span1, {{0, 2, false, true}, {2, 1, true, false}});
}
{
SCOPED_TRACE("bad_types2 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types2));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 1 of type "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
}
{
SCOPED_TRACE("types2 segmenting of batch1");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types2));
ExecSpan span1(batch1);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 2 "),
segmenter->GetNextSegment(span1, 0));
segmenter->GetSegments(span1));
ExecSpan span2(batch2);
TestSegments(segmenter, span2,
{{0, 1, false, true},
{1, 1, false, false},
{2, 1, true, false},
{3, 0, true, true}});
{{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}});
}
}

Expand All @@ -696,8 +675,7 @@ TEST(RowSegmenter, NonOrdered) {
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{4, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
Expand All @@ -707,8 +685,7 @@ TEST(RowSegmenter, NonOrdered) {
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{4, 1, true, false}});
}
}

Expand Down Expand Up @@ -767,8 +744,7 @@ TEST(RowSegmenter, MultipleSegments) {
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{8, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
Expand All @@ -782,8 +758,7 @@ TEST(RowSegmenter, MultipleSegments) {
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{8, 1, true, false}});
}
}

Expand Down Expand Up @@ -845,7 +820,7 @@ void TestRowSegmenterConstantBatch(
std::vector<TypeHolder> key_types(types.begin(), types.begin() + size);
ARROW_ASSIGN_OR_RAISE(auto segmenter, make_segmenter(key_types));
for (size_t i = 0; i < repetitions; i++) {
TestSegments(segmenter, ExecSpan(batch), {{0, 3, true, true}, {3, 0, true, true}});
TestSegments(segmenter, ExecSpan(batch), {{0, 3, true, true}});
ARROW_RETURN_NOT_OK(segmenter->Reset());
}
return Status::OK();
Expand Down Expand Up @@ -893,10 +868,9 @@ TEST(RowSegmenter, RowConstantBatch) {
constexpr size_t n = 3;
std::vector<TypeHolder> types = {int32(), int32(), int32()};
auto full_batch = ExecBatchFromJSON(types, "[[1, 1, 1], [2, 2, 2], [3, 3, 3]]");
std::vector<Segment> expected_segments_for_size_0 = {{0, 3, true, true},
{3, 0, true, true}};
std::vector<Segment> expected_segments_for_size_0 = {{0, 3, true, true}};
std::vector<Segment> expected_segments = {
{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}, {3, 0, true, true}};
{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}};
auto test_by_size = [&](size_t size) -> Status {
SCOPED_TRACE("constant-batch with " + ToChars(size) + " key(s)");
std::vector<Datum> values(full_batch.values.begin(),
Expand Down
Loading
Loading