diff --git a/src/engine/GroupBy.cpp b/src/engine/GroupBy.cpp index 9dde7353a3..6fdeca1833 100644 --- a/src/engine/GroupBy.cpp +++ b/src/engine/GroupBy.cpp @@ -330,7 +330,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) { if (useHashMapOptimization) { const auto* child = _subtree->getRootOperation()->getChildren().at(0); // Skip sorting - subresult = child->getResult(); + subresult = child->getResult(true); // Update runtime information auto runTimeInfoChildren = child->getRootOperation()->getRuntimeInfoPointer(); @@ -366,13 +366,28 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) { } if (useHashMapOptimization) { - auto localVocab = subresult->getCopyOfLocalVocab(); - IdTable idTable = CALL_FIXED_SIZE( - groupByCols.size(), &GroupBy::computeGroupByForHashMapOptimization, - this, metadataForUnsequentialData->aggregateAliases_, - subresult->idTable(), groupByCols, &localVocab); + // Helper lambda that calls `computeGroupByForHashMapOptimization` for the + // given `subresults`. + auto computeWithHashMap = [this, &metadataForUnsequentialData, + &groupByCols](auto&& subresults) { + auto doCompute = [&] { + return computeGroupByForHashMapOptimization( + metadataForUnsequentialData->aggregateAliases_, AD_FWD(subresults), + groupByCols); + }; + return ad_utility::callFixedSize(groupByCols.size(), doCompute); + }; - return {std::move(idTable), resultSortedOn(), std::move(localVocab)}; + // Now call `computeWithHashMap` and return the result. It expects a range + // of results, so if the result is fully materialized, we create an array + // with a single element. + if (subresult->isFullyMaterialized()) { + return computeWithHashMap( + std::array{std::pair{std::cref(subresult->idTable()), + std::cref(subresult->localVocab())}}); + } else { + return computeWithHashMap(std::move(subresult->idTables())); + } } size_t inWidth = _subtree->getResultWidth(); @@ -846,7 +861,7 @@ std::optional GroupBy::computeGroupByForJoinWithFullScan() const { const auto& index = getExecutionContext()->getIndex(); // TODO Simplify the following pattern by using - // `ql::views::chunkd_by` and implement a lazy version of this view for + // `ql::views::chunk_by` and implement a lazy version of this view for // input iterators. // Take care of duplicate values in the input. @@ -1487,78 +1502,95 @@ static constexpr auto makeProcessGroupsVisitor = // _____________________________________________________________________________ template -IdTable GroupBy::computeGroupByForHashMapOptimization( - std::vector& aggregateAliases, - const IdTable& subresult, const std::vector& columnIndices, - LocalVocab* localVocab) const { - AD_CONTRACT_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS || - NUM_GROUP_COLUMNS == 0); - - // Initialize aggregation data +Result GroupBy::computeGroupByForHashMapOptimization( + std::vector& aggregateAliases, auto subresults, + const std::vector& columnIndices) const { + AD_CORRECTNESS_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS || + NUM_GROUP_COLUMNS == 0); + LocalVocab localVocab; + + // Initialize the data for the aggregates of the GROUP BY operation. HashMapAggregationData aggregationData( getExecutionContext()->getAllocator(), aggregateAliases, columnIndices.size()); - // Initialize evaluation context - sparqlExpression::EvaluationContext evaluationContext( - *getExecutionContext(), _subtree->getVariableColumns(), subresult, - getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_, - deadline_); - - evaluationContext._groupedVariables = ad_utility::HashSet{ - _groupByVariables.begin(), _groupByVariables.end()}; - evaluationContext._isPartOfGroupBy = true; - + // Process the input blocks (pairs of `IdTable` and `LocalVocab`) one after + // the other. ad_utility::Timer lookupTimer{ad_utility::Timer::Stopped}; ad_utility::Timer aggregationTimer{ad_utility::Timer::Stopped}; - for (size_t i = 0; i < subresult.size(); i += GROUP_BY_HASH_MAP_BLOCK_SIZE) { - checkCancellation(); - - evaluationContext._beginIndex = i; - evaluationContext._endIndex = - std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, subresult.size()); - - auto currentBlockSize = evaluationContext.size(); - - // Perform HashMap lookup once for all groups in current block - using U = HashMapAggregationData::template ArrayOrVector< - std::span>; - U groupValues; - resizeIfVector(groupValues, columnIndices.size()); - - // TODO use views::enumerate - size_t j = 0; - for (auto& idx : columnIndices) { - groupValues[j] = subresult.getColumn(idx).subspan( - evaluationContext._beginIndex, currentBlockSize); - ++j; - } - lookupTimer.cont(); - auto hashEntries = aggregationData.getHashEntries(groupValues); - lookupTimer.stop(); - - aggregationTimer.cont(); - for (auto& aggregateAlias : aggregateAliases) { - for (auto& aggregate : aggregateAlias.aggregateInfo_) { - sparqlExpression::ExpressionResult expressionResult = - GroupBy::evaluateChildExpressionOfAggregateFunction( - aggregate, evaluationContext); - - auto& aggregationDataVariant = - aggregationData.getAggregationDataVariant( - aggregate.aggregateDataIndex_); - - std::visit(makeProcessGroupsVisitor(currentBlockSize, - &evaluationContext, hashEntries), - std::move(expressionResult), aggregationDataVariant); + for (const auto& [inputTableRef, inputLocalVocabRef] : subresults) { + const IdTable& inputTable = inputTableRef; + const LocalVocab& inputLocalVocab = inputLocalVocabRef; + + // Merge the local vocab of each input block. + // + // NOTE: If the input blocks have very similar or even identical non-empty + // local vocabs, no deduplication is performed. + localVocab.mergeWith(std::span{&inputLocalVocab, 1}); + + // Setup the `EvaluationContext` for this input block. + sparqlExpression::EvaluationContext evaluationContext( + *getExecutionContext(), _subtree->getVariableColumns(), inputTable, + getExecutionContext()->getAllocator(), localVocab, cancellationHandle_, + deadline_); + evaluationContext._groupedVariables = ad_utility::HashSet{ + _groupByVariables.begin(), _groupByVariables.end()}; + evaluationContext._isPartOfGroupBy = true; + + // Iterate of the rows of this input block. Process (up to) + // `GROUP_BY_HASH_MAP_BLOCK_SIZE` rows at a time. + for (size_t i = 0; i < inputTable.size(); + i += GROUP_BY_HASH_MAP_BLOCK_SIZE) { + checkCancellation(); + + evaluationContext._beginIndex = i; + evaluationContext._endIndex = + std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, inputTable.size()); + + auto currentBlockSize = evaluationContext.size(); + + // Perform HashMap lookup once for all groups in current block + using U = HashMapAggregationData< + NUM_GROUP_COLUMNS>::template ArrayOrVector>; + U groupValues; + resizeIfVector(groupValues, columnIndices.size()); + + // TODO use views::enumerate + size_t j = 0; + for (auto& idx : columnIndices) { + groupValues[j] = inputTable.getColumn(idx).subspan( + evaluationContext._beginIndex, currentBlockSize); + ++j; + } + lookupTimer.cont(); + auto hashEntries = aggregationData.getHashEntries(groupValues); + lookupTimer.stop(); + + aggregationTimer.cont(); + for (auto& aggregateAlias : aggregateAliases) { + for (auto& aggregate : aggregateAlias.aggregateInfo_) { + sparqlExpression::ExpressionResult expressionResult = + GroupBy::evaluateChildExpressionOfAggregateFunction( + aggregate, evaluationContext); + + auto& aggregationDataVariant = + aggregationData.getAggregationDataVariant( + aggregate.aggregateDataIndex_); + + std::visit(makeProcessGroupsVisitor(currentBlockSize, + &evaluationContext, hashEntries), + std::move(expressionResult), aggregationDataVariant); + } } + aggregationTimer.stop(); } - aggregationTimer.stop(); } + runtimeInfo().addDetail("timeMapLookup", lookupTimer.msecs()); runtimeInfo().addDetail("timeAggregation", aggregationTimer.msecs()); - - return createResultFromHashMap(aggregationData, aggregateAliases, localVocab); + IdTable resultTable = + createResultFromHashMap(aggregationData, aggregateAliases, &localVocab); + return {std::move(resultTable), resultSortedOn(), std::move(localVocab)}; } // _____________________________________________________________________________ diff --git a/src/engine/GroupBy.h b/src/engine/GroupBy.h index afe824d492..8232f381ab 100644 --- a/src/engine/GroupBy.h +++ b/src/engine/GroupBy.h @@ -1,8 +1,7 @@ -// Copyright 2018, University of Freiburg, +// Copyright 2018 - 2024, University of Freiburg // Chair of Algorithms and Data Structures. -// Author: -// 2018 Florian Kramer (florian.kramer@mail.uni-freiburg.de) -// 2020- Johannes Kalmbach (kalmbach@informatik.uni-freiburg.de) +// Authors: Florian Kramer [2018] +// Johannes Kalmbach #pragma once @@ -316,10 +315,9 @@ class GroupBy : public Operation { // Create result IdTable by using a HashMap mapping groups to aggregation data // and subsequently calling `createResultFromHashMap`. template - IdTable computeGroupByForHashMapOptimization( - std::vector& aggregateAliases, - const IdTable& subresult, const std::vector& columnIndices, - LocalVocab* localVocab) const; + Result computeGroupByForHashMapOptimization( + std::vector& aggregateAliases, auto subresults, + const std::vector& columnIndices) const; using AggregationData = std::variant #include #include @@ -36,6 +37,7 @@ using ::testing::Optional; namespace { auto I = IntId; +auto D = DoubleId; // Return a matcher that checks, whether a given `std::optionalasDebugString()); } +// _____________________________________________________________________________ +TEST_F(GroupByOptimizations, hashMapOptimizationLazyAndMaterializedInputs) { + /* Setup query: + SELECT ?x (AVG(?y) as ?avg) WHERE { + # explicitly defined subresult. + } GROUP BY ?x + */ + // Setup three unsorted input blocks. The first column will be the grouped + // `?x`, and the second column the variable `?y` of which we compute the + // average. + auto runTest = [this](bool inputIsLazy) { + std::vector tables; + tables.push_back(makeIdTableFromVector({{3, 6}, {8, 27}, {5, 7}}, I)); + tables.push_back(makeIdTableFromVector({{8, 27}, {5, 9}}, I)); + tables.push_back(makeIdTableFromVector({{5, 2}, {3, 4}}, I)); + // The expected averages are as follows: (3 -> 5.0), (5 -> 6.0), (8 + // -> 27.0). + auto subtree = ad_utility::makeExecutionTree( + qec, std::move(tables), + std::vector>{Variable{"?x"}, Variable{"?y"}}); + auto& values = + dynamic_cast(*subtree->getRootOperation()); + values.forceFullyMaterialized() = !inputIsLazy; + + SparqlExpressionPimpl avgYPimpl = makeAvgPimpl(varY); + std::vector aliasesAvgY{Alias{avgYPimpl, Variable{"?avg"}}}; + + // Calculate result with optimization + qec->getQueryTreeCache().clearAll(); + RuntimeParameters().set<"group-by-hash-map-enabled">(true); + GroupBy groupBy{qec, variablesOnlyX, aliasesAvgY, std::move(subtree)}; + auto result = groupBy.computeResultOnlyForTesting(); + ASSERT_TRUE(result.isFullyMaterialized()); + EXPECT_THAT( + result.idTable(), + matchesIdTableFromVector({{I(3), D(5)}, {I(5), D(6)}, {I(8), D(27)}})); + }; + runTest(true); + runTest(false); + + // Disable optimization for following tests + RuntimeParameters().set<"group-by-hash-map-enabled">(false); +} + // _____________________________________________________________________________ TEST_F(GroupByOptimizations, correctResultForHashMapOptimizationForCountStar) { /* Setup query: diff --git a/test/engine/ValuesForTesting.h b/test/engine/ValuesForTesting.h index c02a9826bc..097ccd9c78 100644 --- a/test/engine/ValuesForTesting.h +++ b/test/engine/ValuesForTesting.h @@ -120,6 +120,8 @@ class ValuesForTesting : public Operation { } bool supportsLimit() const override { return supportsLimit_; } + bool& forceFullyMaterialized() { return forceFullyMaterialized_; } + private: // ___________________________________________________________________________ string getCacheKeyImpl() const override {