Skip to content

Commit

Permalink
Implement toIntermediate for merging HLL
Browse files Browse the repository at this point in the history
Summary:
For the queries that read HLL digests from table and call `merge` on
them, we used to create empty accumulator and merge it with the serialized
digest, and then serialize the merged accumulator again, resulting in wasted
CPU.  Fix this by passing the serialized digests directly in case of abandon
partial aggregation.

Differential Revision: D55935663
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 9, 2024
1 parent 845f13c commit 4f9656d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
12 changes: 12 additions & 0 deletions velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ class ApproxDistinctAggregate : public exec::Aggregate {
return false;
}

bool supportsToIntermediate() const final {
return hllAsRawInput_;
}

void toIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const final {
VELOX_CHECK_EQ(args.size(), 1);
result->copy(args[0].get(), rows, nullptr);
}

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
if (hllAsFinalResult_) {
Expand Down
31 changes: 31 additions & 0 deletions velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,5 +407,36 @@ TEST_F(ApproxDistinctTest, mergeWithEmpty) {
ASSERT_EQ(readSingleValue(op).value<TypeKind::BIGINT>(), 499);
}

TEST_F(ApproxDistinctTest, toIntermediate) {
constexpr int kSize = 1000;
auto input = makeRowVector({
makeFlatVector<int32_t>(kSize, folly::identity),
BaseVector::createConstant(BIGINT(), 1ll, kSize, pool()),
});
auto plan = PlanBuilder()
.values({input})
.singleAggregation({"c0"}, {"approx_set(c1)"})
.planNode();
auto digests = AssertQueryBuilder(plan).copyResults(pool());
plan =
PlanBuilder()
.values({
std::static_pointer_cast<RowVector>(digests->slice(0, kSize / 2)),
std::static_pointer_cast<RowVector>(
digests->slice(kSize / 2, kSize / 2)),
})
.partialAggregation({"c0"}, {"merge(a0)"})
.finalAggregation()
.project({"c0", "cardinality(a0)"})
.planNode();
AssertQueryBuilder queryBuilder(plan);
queryBuilder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, 1)
.config(core::QueryConfig::kAbandonPartialAggregationMinPct, 0)
.config(core::QueryConfig::kMaxPartialAggregationMemory, 0)
.config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, 0)
.maxDrivers(1);
queryBuilder.assertResults(input);
}

} // namespace
} // namespace facebook::velox::aggregate::test

0 comments on commit 4f9656d

Please sign in to comment.