diff --git a/src/engine/ExportQueryExecutionTrees.cpp b/src/engine/ExportQueryExecutionTrees.cpp index fcdba34b76..80af2793b3 100644 --- a/src/engine/ExportQueryExecutionTrees.cpp +++ b/src/engine/ExportQueryExecutionTrees.cpp @@ -13,40 +13,69 @@ #include "util/http/MediaTypes.h" // __________________________________________________________________________ -namespace { + +cppcoro::generator ExportQueryExecutionTrees::getIdTables( + const Result& result) { + if (result.isFullyMaterialized()) { + co_yield result.idTable(); + } else { + for (const IdTable& idTable : result.idTables()) { + co_yield idTable; + } + } +} + // Return a range that contains the indices of the rows that have to be exported // from the `idTable` given the `LimitOffsetClause`. It takes into account the // LIMIT, the OFFSET, and the actual size of the `idTable` -auto getRowIndices(const LimitOffsetClause& limitOffset, const Result& result) { - const IdTable& idTable = result.idTable(); - return std::views::iota(limitOffset.actualOffset(idTable.size()), - limitOffset.upperBound(idTable.size())); +cppcoro::generator +ExportQueryExecutionTrees::getRowIndices(LimitOffsetClause limitOffset, + const Result& result) { + if (limitOffset._limit.value_or(1) == 0) { + co_return; + } + for (const IdTable& idTable : getIdTables(result)) { + uint64_t currentOffset = limitOffset.actualOffset(idTable.numRows()); + uint64_t upperBound = limitOffset.upperBound(idTable.numRows()); + if (currentOffset != upperBound) { + co_yield {idTable, std::views::iota(currentOffset, upperBound)}; + } + limitOffset._offset -= currentOffset; + if (limitOffset._limit.has_value()) { + limitOffset._limit = + limitOffset._limit.value() - (upperBound - currentOffset); + } + if (limitOffset._limit.value_or(1) == 0) { + break; + } + } } -} // namespace // _____________________________________________________________________________ cppcoro::generator ExportQueryExecutionTrees::constructQueryResultToTriples( const QueryExecutionTree& qet, const ad_utility::sparql_types::Triples& constructTriples, - LimitOffsetClause limitAndOffset, std::shared_ptr res, + LimitOffsetClause limitAndOffset, std::shared_ptr result, CancellationHandle cancellationHandle) { - for (size_t i : getRowIndices(limitAndOffset, *res)) { - ConstructQueryExportContext context{i, res->idTable(), res->localVocab(), - qet.getVariableColumns(), - qet.getQec()->getIndex()}; - using enum PositionInTriple; - for (const auto& triple : constructTriples) { - auto subject = triple[0].evaluate(context, SUBJECT); - auto predicate = triple[1].evaluate(context, PREDICATE); - auto object = triple[2].evaluate(context, OBJECT); - if (!subject.has_value() || !predicate.has_value() || - !object.has_value()) { - continue; + for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) { + for (uint64_t i : range) { + ConstructQueryExportContext context{i, idTable, result->localVocab(), + qet.getVariableColumns(), + qet.getQec()->getIndex()}; + using enum PositionInTriple; + for (const auto& triple : constructTriples) { + auto subject = triple[0].evaluate(context, SUBJECT); + auto predicate = triple[1].evaluate(context, PREDICATE); + auto object = triple[2].evaluate(context, OBJECT); + if (!subject.has_value() || !predicate.has_value() || + !object.has_value()) { + continue; + } + co_yield {std::move(subject.value()), std::move(predicate.value()), + std::move(object.value())}; + cancellationHandle->throwIfCancelled(); } - co_yield {std::move(subject.value()), std::move(predicate.value()), - std::move(object.value())}; - cancellationHandle->throwIfCancelled(); } } } @@ -112,34 +141,35 @@ nlohmann::json ExportQueryExecutionTrees::idTableToQLeverJSONArray( std::shared_ptr result, CancellationHandle cancellationHandle) { AD_CORRECTNESS_CHECK(result != nullptr); - const IdTable& data = result->idTable(); nlohmann::json json = nlohmann::json::array(); - for (size_t rowIndex : getRowIndices(limitAndOffset, *result)) { - // We need the explicit `array` constructor for the special case of zero - // variables. - json.push_back(nlohmann::json::array()); - auto& row = json.back(); - for (const auto& opt : columns) { - if (!opt) { - row.emplace_back(nullptr); - continue; - } - const auto& currentId = data(rowIndex, opt->columnIndex_); - const auto& optionalStringAndXsdType = idToStringAndType( - qet.getQec()->getIndex(), currentId, result->localVocab()); - if (!optionalStringAndXsdType.has_value()) { - row.emplace_back(nullptr); - continue; - } - const auto& [stringValue, xsdType] = optionalStringAndXsdType.value(); - if (xsdType) { - row.emplace_back('"' + stringValue + "\"^^<" + xsdType + '>'); - } else { - row.emplace_back(stringValue); + for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) { + for (uint64_t rowIndex : range) { + // We need the explicit `array` constructor for the special case of zero + // variables. + json.push_back(nlohmann::json::array()); + auto& row = json.back(); + for (const auto& opt : columns) { + if (!opt) { + row.emplace_back(nullptr); + continue; + } + const auto& currentId = idTable(rowIndex, opt->columnIndex_); + const auto& optionalStringAndXsdType = idToStringAndType( + qet.getQec()->getIndex(), currentId, result->localVocab()); + if (!optionalStringAndXsdType.has_value()) { + row.emplace_back(nullptr); + continue; + } + const auto& [stringValue, xsdType] = optionalStringAndXsdType.value(); + if (xsdType) { + row.emplace_back('"' + stringValue + "\"^^<" + xsdType + '>'); + } else { + row.emplace_back(stringValue); + } } + cancellationHandle->throwIfCancelled(); } - cancellationHandle->throwIfCancelled(); } return json; } @@ -231,7 +261,7 @@ ExportQueryExecutionTrees::idToStringAndType(const Index& index, Id id, return std::pair{escapeFunction(word.toStringRepresentation()), nullptr}; }; switch (id.getDatatype()) { - case Datatype::WordVocabIndex: { + case WordVocabIndex: { std::string_view entity = index.indexToString(id.getWordVocabIndex()); return std::pair{escapeFunction(std::string{entity}), nullptr}; } @@ -288,8 +318,6 @@ nlohmann::json ExportQueryExecutionTrees::selectQueryResultToSparqlJSON( std::erase(columns, std::nullopt); - const IdTable& idTable = result->idTable(); - json resultJson; std::vector selectedVars = selectClause.getSelectedVariablesAsStrings(); @@ -357,32 +385,34 @@ nlohmann::json ExportQueryExecutionTrees::selectQueryResultToSparqlJSON( return b; }; - for (size_t rowIndex : getRowIndices(limitAndOffset, *result)) { - // TODO: ordered_json` entries are ordered alphabetically, but insertion - // order would be preferable. - nlohmann::ordered_json binding; - for (const auto& column : columns) { - const auto& currentId = idTable(rowIndex, column->columnIndex_); - const auto& optionalValue = idToStringAndType( - qet.getQec()->getIndex(), currentId, result->localVocab()); - if (!optionalValue.has_value()) { - continue; - } - const auto& [stringValue, xsdType] = optionalValue.value(); - nlohmann::ordered_json b; - if (!xsdType) { - // No xsdType, this means that `stringValue` is a plain string literal - // or entity. - b = stringToBinding(stringValue); - } else { - b["value"] = stringValue; - b["type"] = "literal"; - b["datatype"] = xsdType; + for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) { + for (uint64_t rowIndex : range) { + // TODO: ordered_json` entries are ordered alphabetically, but insertion + // order would be preferable. + nlohmann::ordered_json binding; + for (const auto& column : columns) { + const auto& currentId = idTable(rowIndex, column->columnIndex_); + const auto& optionalValue = idToStringAndType( + qet.getQec()->getIndex(), currentId, result->localVocab()); + if (!optionalValue.has_value()) { + continue; + } + const auto& [stringValue, xsdType] = optionalValue.value(); + nlohmann::ordered_json b; + if (!xsdType) { + // No xsdType, this means that `stringValue` is a plain string literal + // or entity. + b = stringToBinding(stringValue); + } else { + b["value"] = stringValue; + b["type"] = "literal"; + b["datatype"] = xsdType; + } + binding[column->variable_] = std::move(b); } - binding[column->variable_] = std::move(b); + bindings.emplace_back(std::move(binding)); + cancellationHandle->throwIfCancelled(); } - bindings.emplace_back(std::move(binding)); - cancellationHandle->throwIfCancelled(); } resultJson["results"]["bindings"] = std::move(bindings); return resultJson; @@ -430,18 +460,20 @@ ExportQueryExecutionTrees::selectQueryResultToStream( auto selectedColumnIndices = qet.selectedVariablesToColumnIndices(selectClause, true); - const auto& idTable = result->idTable(); // special case : binary export of IdTable if constexpr (format == MediaType::octetStream) { - for (size_t i : getRowIndices(limitAndOffset, *result)) { - for (const auto& columnIndex : selectedColumnIndices) { - if (columnIndex.has_value()) { - co_yield std::string_view{reinterpret_cast(&idTable( - i, columnIndex.value().columnIndex_)), - sizeof(Id)}; + for (const auto& [idTable, range] : + getRowIndices(limitAndOffset, *result)) { + for (uint64_t i : range) { + for (const auto& columnIndex : selectedColumnIndices) { + if (columnIndex.has_value()) { + co_yield std::string_view{reinterpret_cast(&idTable( + i, columnIndex.value().columnIndex_)), + sizeof(Id)}; + } } + cancellationHandle->throwIfCancelled(); } - cancellationHandle->throwIfCancelled(); } co_return; } @@ -461,22 +493,24 @@ ExportQueryExecutionTrees::selectQueryResultToStream( constexpr auto& escapeFunction = format == MediaType::tsv ? RdfEscaping::escapeForTsv : RdfEscaping::escapeForCsv; - for (size_t i : getRowIndices(limitAndOffset, *result)) { - for (size_t j = 0; j < selectedColumnIndices.size(); ++j) { - if (selectedColumnIndices[j].has_value()) { - const auto& val = selectedColumnIndices[j].value(); - Id id = idTable(i, val.columnIndex_); - auto optionalStringAndType = - idToStringAndType( - qet.getQec()->getIndex(), id, result->localVocab(), - escapeFunction); - if (optionalStringAndType.has_value()) [[likely]] { - co_yield optionalStringAndType.value().first; + for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) { + for (uint64_t i : range) { + for (size_t j = 0; j < selectedColumnIndices.size(); ++j) { + if (selectedColumnIndices[j].has_value()) { + const auto& val = selectedColumnIndices[j].value(); + Id id = idTable(i, val.columnIndex_); + auto optionalStringAndType = + idToStringAndType( + qet.getQec()->getIndex(), id, result->localVocab(), + escapeFunction); + if (optionalStringAndType.has_value()) [[likely]] { + co_yield optionalStringAndType.value().first; + } } + co_yield j + 1 < selectedColumnIndices.size() ? separator : '\n'; } - co_yield j + 1 < selectedColumnIndices.size() ? separator : '\n'; + cancellationHandle->throwIfCancelled(); } - cancellationHandle->throwIfCancelled(); } LOG(DEBUG) << "Done creating readable result.\n"; } @@ -581,22 +615,23 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees:: co_yield "\n"; result->logResultSize(); - const auto& idTable = result->idTable(); auto selectedColumnIndices = qet.selectedVariablesToColumnIndices(selectClause, false); // TODO we could prefilter for the nonexisting variables. - for (size_t i : getRowIndices(limitAndOffset, *result)) { - co_yield "\n "; - for (size_t j = 0; j < selectedColumnIndices.size(); ++j) { - if (selectedColumnIndices[j].has_value()) { - const auto& val = selectedColumnIndices[j].value(); - Id id = idTable(i, val.columnIndex_); - co_yield idToXMLBinding(val.variable_, id, qet.getQec()->getIndex(), - result->localVocab()); + for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) { + for (uint64_t i : range) { + co_yield "\n "; + for (size_t j = 0; j < selectedColumnIndices.size(); ++j) { + if (selectedColumnIndices[j].has_value()) { + const auto& val = selectedColumnIndices[j].value(); + Id id = idTable(i, val.columnIndex_); + co_yield idToXMLBinding(val.variable_, id, qet.getQec()->getIndex(), + result->localVocab()); + } } + co_yield "\n "; + cancellationHandle->throwIfCancelled(); } - co_yield "\n "; - cancellationHandle->throwIfCancelled(); } co_yield "\n"; co_yield "\n"; @@ -642,42 +677,43 @@ nlohmann::json ExportQueryExecutionTrees::computeQueryResultAsQLeverJSON( const ParsedQuery& query, const QueryExecutionTree& qet, const ad_utility::Timer& requestTimer, CancellationHandle cancellationHandle) { - std::shared_ptr result = qet.getResult(); + auto timeUntilFunctionCall = requestTimer.msecs(); + // Always request lazy if possible, the lower memory footprint outvalues the + // potential overhead of generators. + std::shared_ptr result = qet.getResult(true); result->logResultSize(); - auto timeResultComputation = requestTimer.msecs(); - - size_t resultSize = result->idTable().size(); nlohmann::json j; j["query"] = query._originalString; j["status"] = "OK"; j["warnings"] = qet.collectWarnings(); - if (query.hasSelectClause()) { - j["selected"] = query.selectClause().getSelectedVariablesAsStrings(); - } else { - j["selected"] = - std::vector{"?subject", "?predicate", "?object"}; - } + j["selected"] = + query.hasSelectClause() + ? query.selectClause().getSelectedVariablesAsStrings() + : std::vector{"?subject", "?predicate", "?object"}; + + j["res"] = + query.hasSelectClause() + ? selectQueryResultBindingsToQLeverJSON( + qet, query.selectClause(), query._limitOffset, + std::move(result), std::move(cancellationHandle)) + : constructQueryResultBindingsToQLeverJSON( + qet, query.constructClause().triples_, query._limitOffset, + std::move(result), std::move(cancellationHandle)); j["runtimeInformation"]["meta"] = nlohmann::ordered_json( qet.getRootOperation()->getRuntimeInfoWholeQuery()); RuntimeInformation runtimeInformation = qet.getRootOperation()->runtimeInfo(); - runtimeInformation.addLimitOffsetRow( - query._limitOffset, std::chrono::milliseconds::zero(), false); + runtimeInformation.addLimitOffsetRow(query._limitOffset, false); j["runtimeInformation"]["query_execution_tree"] = nlohmann::ordered_json(runtimeInformation); - { - j["res"] = - query.hasSelectClause() - ? selectQueryResultBindingsToQLeverJSON( - qet, query.selectClause(), query._limitOffset, - std::move(result), std::move(cancellationHandle)) - : constructQueryResultBindingsToQLeverJSON( - qet, query.constructClause().triples_, query._limitOffset, - std::move(result), std::move(cancellationHandle)); - } + auto timeResultComputation = + timeUntilFunctionCall + runtimeInformation.totalTime_; + + size_t resultSize = runtimeInformation.numRows_; + j["resultsize"] = query.hasSelectClause() ? resultSize : j["res"].size(); j["time"]["total"] = std::to_string(requestTimer.msecs().count()) + "ms"; j["time"]["computeResult"] = @@ -692,15 +728,14 @@ ExportQueryExecutionTrees::computeResultAsStream( const ParsedQuery& parsedQuery, const QueryExecutionTree& qet, ad_utility::MediaType mediaType, CancellationHandle cancellationHandle) { auto compute = [&] { - auto limitAndOffset = parsedQuery._limitOffset; return parsedQuery.hasSelectClause() - ? ExportQueryExecutionTrees::selectQueryResultToStream( - qet, parsedQuery.selectClause(), limitAndOffset, + ? selectQueryResultToStream( + qet, parsedQuery.selectClause(), parsedQuery._limitOffset, std::move(cancellationHandle)) - : ExportQueryExecutionTrees::constructQueryResultToStream< - format>(qet, parsedQuery.constructClause().triples_, - limitAndOffset, qet.getResult(), - std::move(cancellationHandle)); + : constructQueryResultToStream( + qet, parsedQuery.constructClause().triples_, + parsedQuery._limitOffset, qet.getResult(), + std::move(cancellationHandle)); }; using enum MediaType; diff --git a/src/engine/ExportQueryExecutionTrees.h b/src/engine/ExportQueryExecutionTrees.h index 4e8db28e0e..51b9825998 100644 --- a/src/engine/ExportQueryExecutionTrees.h +++ b/src/engine/ExportQueryExecutionTrees.h @@ -177,4 +177,34 @@ class ExportQueryExecutionTrees { const QueryExecutionTree& qet, const parsedQuery::SelectClause& selectClause, LimitOffsetClause limitAndOffset, CancellationHandle cancellationHandle); + + // Helper type that contains an `IdTable` and a view with related indices to + // access the `IdTable` with. + struct TableWithRange { + const IdTable& idTable_; + std::ranges::iota_view view_; + }; + + // Yield all `IdTables` provided by the given `result`. + static cppcoro::generator getIdTables(const Result& result); + + // Return a range that contains the indices of the rows that have to be + // exported from the `idTable` given the `LimitOffsetClause`. It takes into + // account the LIMIT, the OFFSET, and the actual size of the `idTable` + static cppcoro::generator getRowIndices( + LimitOffsetClause limitOffset, const Result& result); + + FRIEND_TEST(ExportQueryExecutionTrees, getIdTablesReturnsSingletonIterator); + FRIEND_TEST(ExportQueryExecutionTrees, getIdTablesMirrorsGenerator); + FRIEND_TEST(ExportQueryExecutionTrees, ensureCorrectSlicingOfSingleIdTable); + FRIEND_TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstIsSkipped); + FRIEND_TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenLastIsSkipped); + FRIEND_TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstAndSecondArePartial); + FRIEND_TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstAndLastArePartial); + FRIEND_TEST(ExportQueryExecutionTrees, + ensureGeneratorIsNotConsumedWhenNotRequired); }; diff --git a/src/engine/Filter.cpp b/src/engine/Filter.cpp index 4ce875cfeb..2458d87fb0 100644 --- a/src/engine/Filter.cpp +++ b/src/engine/Filter.cpp @@ -43,41 +43,59 @@ string Filter::getDescriptor() const { } // _____________________________________________________________________________ -ProtoResult Filter::computeResult([[maybe_unused]] bool requestLaziness) { +ProtoResult Filter::computeResult(bool requestLaziness) { LOG(DEBUG) << "Getting sub-result for Filter result computation..." << endl; - std::shared_ptr subRes = _subtree->getResult(); + std::shared_ptr subRes = _subtree->getResult(requestLaziness); LOG(DEBUG) << "Filter result computation..." << endl; checkCancellation(); - IdTable idTable{getExecutionContext()->getAllocator()}; - idTable.setNumColumns(subRes->idTable().numColumns()); + if (subRes->isFullyMaterialized()) { + IdTable result = filterIdTable(subRes, subRes->idTable()); + LOG(DEBUG) << "Filter result computation done." << endl; + + return {std::move(result), resultSortedOn(), subRes->getSharedLocalVocab()}; + } + auto localVocab = subRes->getSharedLocalVocab(); + return {[](auto subRes, auto* self) -> cppcoro::generator { + for (IdTable& idTable : subRes->idTables()) { + IdTable result = self->filterIdTable(subRes, idTable); + co_yield result; + } + }(std::move(subRes), this), + resultSortedOn(), std::move(localVocab)}; +} - size_t width = idTable.numColumns(); - CALL_FIXED_SIZE(width, &Filter::computeFilterImpl, this, &idTable, *subRes); - LOG(DEBUG) << "Filter result computation done." << endl; - checkCancellation(); +// _____________________________________________________________________________ +IdTable Filter::filterIdTable(const std::shared_ptr& subRes, + const IdTable& idTable) { + sparqlExpression::EvaluationContext evaluationContext( + *getExecutionContext(), _subtree->getVariableColumns(), idTable, + getExecutionContext()->getAllocator(), subRes->localVocab(), + cancellationHandle_, deadline_); - return {std::move(idTable), resultSortedOn(), subRes->getSharedLocalVocab()}; + // TODO This should be a mandatory argument to the + // EvaluationContext constructor. + evaluationContext._columnsByWhichResultIsSorted = subRes->sortedBy(); + + size_t width = evaluationContext._inputTable.numColumns(); + IdTable result = CALL_FIXED_SIZE(width, &Filter::computeFilterImpl, this, + evaluationContext); + checkCancellation(); + return result; } // _____________________________________________________________________________ template -void Filter::computeFilterImpl(IdTable* outputIdTable, - const Result& inputResultTable) { - sparqlExpression::EvaluationContext evaluationContext( - *getExecutionContext(), _subtree->getVariableColumns(), - inputResultTable.idTable(), getExecutionContext()->getAllocator(), - inputResultTable.localVocab(), cancellationHandle_, deadline_); - - // TODO This should be a mandatory argument to the EvaluationContext - // constructor. - evaluationContext._columnsByWhichResultIsSorted = inputResultTable.sortedBy(); +IdTable Filter::computeFilterImpl( + sparqlExpression::EvaluationContext& evaluationContext) { + IdTable idTable{getExecutionContext()->getAllocator()}; + idTable.setNumColumns(evaluationContext._inputTable.numColumns()); sparqlExpression::ExpressionResult expressionResult = _expression.getPimpl()->evaluate(&evaluationContext); - const auto input = inputResultTable.idTable().asStaticView(); - auto output = std::move(*outputIdTable).toStatic(); + const auto input = evaluationContext._inputTable.asStaticView(); + auto output = std::move(idTable).toStatic(); // Clang 17 seems to incorrectly deduce the type, so try to trick it std::remove_const_t& output2 = output; @@ -123,7 +141,7 @@ void Filter::computeFilterImpl(IdTable* outputIdTable, std::visit(visitor, std::move(expressionResult)); - *outputIdTable = std::move(output).toDynamic(); + return std::move(output).toDynamic(); } // _____________________________________________________________________________ diff --git a/src/engine/Filter.h b/src/engine/Filter.h index 7e180b7172..bc34279f77 100644 --- a/src/engine/Filter.h +++ b/src/engine/Filter.h @@ -58,9 +58,15 @@ class Filter : public Operation { return _subtree->getVariableColumns(); } - ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override; + ProtoResult computeResult(bool requestLaziness) override; + // Perform the actual filter operation of the data provided by + // `evaluationContext`. template - void computeFilterImpl(IdTable* outputIdTable, - const Result& inputResultTable); + IdTable computeFilterImpl( + sparqlExpression::EvaluationContext& evaluationContext); + + // Run `computeFilterImpl` on the provided IdTable + IdTable filterIdTable(const std::shared_ptr& subRes, + const IdTable& idTable); }; diff --git a/src/engine/IndexScan.cpp b/src/engine/IndexScan.cpp index 028eea3022..4cc2780184 100644 --- a/src/engine/IndexScan.cpp +++ b/src/engine/IndexScan.cpp @@ -122,9 +122,28 @@ VariableToColumnMap IndexScan::computeVariableToColumnMap() const { std::ranges::for_each(additionalVariables_, addCol); return variableToColumnMap; } + // _____________________________________________________________________________ -ProtoResult IndexScan::computeResult([[maybe_unused]] bool requestLaziness) { +cppcoro::generator IndexScan::scanInChunks() const { + auto metadata = getMetadataForScan(*this); + if (!metadata.has_value()) { + co_return; + } + auto blocksSpan = + CompressedRelationReader::getBlocksFromMetadata(metadata.value()); + std::vector blocks{blocksSpan.begin(), + blocksSpan.end()}; + for (IdTable& idTable : getLazyScan(*this, std::move(blocks))) { + co_yield std::move(idTable); + } +} + +// _____________________________________________________________________________ +ProtoResult IndexScan::computeResult(bool requestLaziness) { LOG(DEBUG) << "IndexScan result computation...\n"; + if (requestLaziness) { + return {scanInChunks(), resultSortedOn(), LocalVocab{}}; + } IdTable idTable{getExecutionContext()->getAllocator()}; using enum Permutation::Enum; diff --git a/src/engine/IndexScan.h b/src/engine/IndexScan.h index 5914e27d71..8609bf83ac 100644 --- a/src/engine/IndexScan.h +++ b/src/engine/IndexScan.h @@ -103,7 +103,7 @@ class IndexScan final : public Operation { ScanSpecificationAsTripleComponent getScanSpecification() const; private: - ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override; + ProtoResult computeResult(bool requestLaziness) override; vector getChildren() override { return {}; } @@ -115,6 +115,8 @@ class IndexScan final : public Operation { VariableToColumnMap computeVariableToColumnMap() const override; + cppcoro::generator scanInChunks() const; + // Helper functions for the public `getLazyScanFor...` functions (see above). static Permutation::IdTableGenerator getLazyScan( const IndexScan& s, std::vector blocks); diff --git a/src/engine/Operation.cpp b/src/engine/Operation.cpp index 4293aa2c9c..dc1ee4d97c 100644 --- a/src/engine/Operation.cpp +++ b/src/engine/Operation.cpp @@ -69,6 +69,143 @@ void Operation::recursivelySetTimeConstraint( }); } +// _____________________________________________________________________________ +void Operation::updateRuntimeStats(bool applyToLimit, uint64_t numRows, + uint64_t numCols, + std::chrono::microseconds duration) const { + bool isRtiWrappedInLimit = !applyToLimit && externalLimitApplied_; + auto& rti = + isRtiWrappedInLimit ? *runtimeInfo().children_.at(0) : runtimeInfo(); + rti.totalTime_ += duration; + rti.originalTotalTime_ = rti.totalTime_; + rti.originalOperationTime_ = rti.getOperationTime(); + // Don't update the number of rows/cols twice if the rti for the limit and the + // rti for the actual operation are the same. + if (!applyToLimit || externalLimitApplied_) { + rti.numRows_ += numRows; + rti.numCols_ = numCols; + } + if (isRtiWrappedInLimit) { + runtimeInfo().totalTime_ += duration; + runtimeInfo().originalTotalTime_ = runtimeInfo().totalTime_; + runtimeInfo().originalOperationTime_ = runtimeInfo().getOperationTime(); + } +} + +// _____________________________________________________________________________ +ProtoResult Operation::runComputation(const ad_utility::Timer& timer, + ComputationMode computationMode) { + AD_CONTRACT_CHECK(computationMode != ComputationMode::ONLY_IF_CACHED); + checkCancellation(); + runtimeInfo().status_ = RuntimeInformation::Status::inProgress; + signalQueryUpdate(); + ProtoResult result = + computeResult(computationMode == ComputationMode::LAZY_IF_SUPPORTED); + AD_CONTRACT_CHECK(computationMode == ComputationMode::LAZY_IF_SUPPORTED || + result.isFullyMaterialized()); + + checkCancellation(); + if constexpr (ad_utility::areExpensiveChecksEnabled) { + // Compute the datatypes that occur in each column of the result. + // Also assert, that if a column contains UNDEF values, then the + // `mightContainUndef` flag for that columns is set. + // TODO It is cheaper to move this calculation into the + // individual results, but that requires changes in each individual + // operation, therefore we currently only perform this expensive + // change in the DEBUG builds. + result.checkDefinedness(getExternallyVisibleVariableColumns()); + } + // Make sure that the results that are written to the cache have the + // correct runtimeInfo. The children of the runtime info are already set + // correctly because the result was computed, so we can pass `nullopt` as + // the last argument. + if (result.isFullyMaterialized()) { + updateRuntimeInformationOnSuccess(result.idTable().size(), + ad_utility::CacheStatus::computed, + timer.msecs(), std::nullopt); + } else { + runtimeInfo().status_ = RuntimeInformation::lazilyMaterialized; + result.runOnNewChunkComputed( + [this, timeSizeUpdate = 0us]( + const IdTable& idTable, + std::chrono::microseconds duration) mutable { + updateRuntimeStats(false, idTable.numRows(), idTable.numColumns(), + duration); + LOG(DEBUG) << "Computed partial chunk of size " << idTable.numRows() + << " x " << idTable.numColumns() << std::endl; + timeSizeUpdate += duration; + if (timeSizeUpdate > 50ms) { + timeSizeUpdate = 0us; + signalQueryUpdate(); + } + }, + [this](bool failed) { + if (failed) { + runtimeInfo().status_ = RuntimeInformation::failed; + } + signalQueryUpdate(); + }); + } + // Apply LIMIT and OFFSET, but only if the call to `computeResult` did not + // already perform it. An example for an operation that directly computes + // the Limit is a full index scan with three variables. Note that the + // `QueryPlanner` does currently only set the limit for operations that + // support it natively, except for operations in subqueries. This means + // that a lot of the time the limit is only artificially applied during + // export, allowing the cache to reuse the same operation for different + // limits and offsets. + if (!supportsLimit()) { + runtimeInfo().addLimitOffsetRow(_limit, true); + AD_CONTRACT_CHECK(!externalLimitApplied_); + externalLimitApplied_ = _limit._limit.has_value() || _limit._offset != 0; + result.applyLimitOffset(_limit, [this](std::chrono::microseconds limitTime, + const IdTable& idTable) { + updateRuntimeStats(true, idTable.numRows(), idTable.numColumns(), + limitTime); + }); + } else { + result.assertThatLimitWasRespected(_limit); + } + return result; +} + +// _____________________________________________________________________________ +CacheValue Operation::runComputationAndPrepareForCache( + const ad_utility::Timer& timer, ComputationMode computationMode, + const std::string& cacheKey, bool pinned) { + auto& cache = _executionContext->getQueryTreeCache(); + auto result = runComputation(timer, computationMode); + if (!result.isFullyMaterialized()) { + AD_CONTRACT_CHECK(!pinned); + result.cacheDuringConsumption( + [maxSize = cache.getMaxSizeSingleEntry()]( + const std::optional& currentIdTable, + const IdTable& newIdTable) { + auto currentSize = currentIdTable.has_value() + ? CacheValue::getSize(currentIdTable.value()) + : 0_B; + return maxSize >= currentSize + CacheValue::getSize(newIdTable); + }, + [runtimeInfo = getRuntimeInfoPointer(), &cache, + cacheKey](Result aggregatedResult) { + auto copy = *runtimeInfo; + copy.status_ = RuntimeInformation::Status::fullyMaterialized; + cache.tryInsertIfNotPresent( + false, cacheKey, + std::make_shared(std::move(aggregatedResult), + std::move(copy))); + }); + } + if (result.isFullyMaterialized()) { + auto resultNumRows = result.idTable().size(); + auto resultNumCols = result.idTable().numColumns(); + LOG(DEBUG) << "Computed result of size " << resultNumRows << " x " + << resultNumCols << std::endl; + } + + return CacheValue{std::move(result), runtimeInfo()}; +} + // ________________________________________________________________________ std::shared_ptr Operation::getResult( bool isRoot, ComputationMode computationMode) { @@ -101,72 +238,33 @@ std::shared_ptr Operation::getResult( updateRuntimeInformationOnFailure(timer.msecs()); } }); - auto computeLambda = [this, &timer, computationMode] { - checkCancellation(); - runtimeInfo().status_ = RuntimeInformation::Status::inProgress; - signalQueryUpdate(); - Result result = - computeResult(computationMode == ComputationMode::LAZY_IF_SUPPORTED); - - checkCancellation(); - // Compute the datatypes that occur in each column of the result. - // Also assert, that if a column contains UNDEF values, then the - // `mightContainUndef` flag for that columns is set. - // TODO It is cheaper to move this calculation into the - // individual results, but that requires changes in each individual - // operation, therefore we currently only perform this expensive - // change in the DEBUG builds. - AD_EXPENSIVE_CHECK( - result.checkDefinedness(getExternallyVisibleVariableColumns())); - // Make sure that the results that are written to the cache have the - // correct runtimeInfo. The children of the runtime info are already set - // correctly because the result was computed, so we can pass `nullopt` as - // the last argument. - updateRuntimeInformationOnSuccess(result, - ad_utility::CacheStatus::computed, - timer.msecs(), std::nullopt); - // Apply LIMIT and OFFSET, but only if the call to `computeResult` did not - // already perform it. An example for an operation that directly computes - // the Limit is a full index scan with three variables. Note that the - // `QueryPlanner` does currently only set the limit for operations that - // support it natively, except for operations in subqueries. This means - // that a lot of the time the limit is only artificially applied during - // export, allowing the cache to reuse the same operation for different - // limits and offsets. - if (!supportsLimit()) { - ad_utility::timer::Timer limitTimer{ad_utility::timer::Timer::Started}; - // Note: both of the following calls have no effect and negligible - // runtime if neither a LIMIT nor an OFFSET were specified. - result.applyLimitOffset(_limit); - runtimeInfo().addLimitOffsetRow(_limit, limitTimer.msecs(), true); - } else { - auto numRows = result.idTable().numRows(); - auto limit = _limit._limit; - AD_CONTRACT_CHECK(!limit.has_value() || - numRows <= static_cast(limit.value())); - } - return CacheValue{std::move(result), runtimeInfo()}; + auto cacheSetup = [this, &timer, computationMode, &cacheKey, pinResult]() { + return runComputationAndPrepareForCache(timer, computationMode, cacheKey, + pinResult); + }; + + auto suitedForCache = [](const CacheValue& cacheValue) { + return cacheValue.resultTable().isFullyMaterialized(); }; bool onlyReadFromCache = computationMode == ComputationMode::ONLY_IF_CACHED; - auto result = pinResult ? cache.computeOncePinned(cacheKey, computeLambda, - onlyReadFromCache) - : cache.computeOnce(cacheKey, computeLambda, - onlyReadFromCache); + auto result = + pinResult ? cache.computeOncePinned(cacheKey, cacheSetup, + onlyReadFromCache, suitedForCache) + : cache.computeOnce(cacheKey, cacheSetup, onlyReadFromCache, + suitedForCache); if (result._resultPointer == nullptr) { AD_CORRECTNESS_CHECK(onlyReadFromCache); return nullptr; } - updateRuntimeInformationOnSuccess(result, timer.msecs()); - auto resultNumRows = result._resultPointer->resultTable()->idTable().size(); - auto resultNumCols = - result._resultPointer->resultTable()->idTable().numColumns(); - LOG(DEBUG) << "Computed result of size " << resultNumRows << " x " - << resultNumCols << std::endl; - return result._resultPointer->resultTable(); + if (result._resultPointer->resultTable().isFullyMaterialized()) { + updateRuntimeInformationOnSuccess(result, timer.msecs()); + } + + return result._resultPointer->resultTablePtr(); } catch (ad_utility::CancellationException& e) { e.setOperation(getDescriptor()); runtimeInfo().status_ = RuntimeInformation::Status::cancelled; @@ -213,10 +311,10 @@ std::chrono::milliseconds Operation::remainingTime() const { // _______________________________________________________________________ void Operation::updateRuntimeInformationOnSuccess( - const Result& resultTable, ad_utility::CacheStatus cacheStatus, - Milliseconds duration, std::optional runtimeInfo) { + size_t numRows, ad_utility::CacheStatus cacheStatus, Milliseconds duration, + std::optional runtimeInfo) { _runtimeInfo->totalTime_ = duration; - _runtimeInfo->numRows_ = resultTable.idTable().size(); + _runtimeInfo->numRows_ = numRows; _runtimeInfo->cacheStatus_ = cacheStatus; _runtimeInfo->status_ = RuntimeInformation::Status::fullyMaterialized; @@ -253,9 +351,10 @@ void Operation::updateRuntimeInformationOnSuccess( void Operation::updateRuntimeInformationOnSuccess( const QueryResultCache::ResultAndCacheStatus& resultAndCacheStatus, Milliseconds duration) { + const auto& result = resultAndCacheStatus._resultPointer->resultTable(); + AD_CONTRACT_CHECK(result.isFullyMaterialized()); updateRuntimeInformationOnSuccess( - *resultAndCacheStatus._resultPointer->resultTable(), - resultAndCacheStatus._cacheStatus, duration, + result.idTable().size(), resultAndCacheStatus._cacheStatus, duration, resultAndCacheStatus._resultPointer->runtimeInfo()); } @@ -272,7 +371,7 @@ void Operation::updateRuntimeInformationWhenOptimizedOut( auto timesOfChildren = _runtimeInfo->children_ | std::views::transform(&RuntimeInformation::totalTime_); _runtimeInfo->totalTime_ = - std::reduce(timesOfChildren.begin(), timesOfChildren.end(), 0ms); + std::reduce(timesOfChildren.begin(), timesOfChildren.end(), 0us); signalQueryUpdate(); } diff --git a/src/engine/Operation.h b/src/engine/Operation.h index 9e23ee3fcf..7420f155f9 100644 --- a/src/engine/Operation.h +++ b/src/engine/Operation.h @@ -6,6 +6,8 @@ #pragma once +#include + #include #include @@ -260,6 +262,32 @@ class Operation { //! Compute the result of the query-subtree rooted at this element.. virtual ProtoResult computeResult(bool requestLaziness) = 0; + // Update the runtime information of this operation according to the given + // arguments, considering the possibility that the initial runtime information + // was replaced by calling `RuntimeInformation::addLimitOffsetRow`. + // `applyToLimit` indicates if the stats should be applied to the runtime + // information of the limit, or the runtime information of the actual + // operation. If `supportsLimit() == true`, then the operation does already + // track the limit stats correctly and there's no need to keep track of both. + // Otherwise `externalLimitApplied_` decides how stat tracking should be + // handled. + void updateRuntimeStats(bool applyToLimit, uint64_t numRows, uint64_t numCols, + std::chrono::microseconds duration) const; + + // Perform the expensive computation modeled by the subclass of this + // `Operation`. The value provided by `computationMode` decides if lazy + // results are preferred. It must not be `ONLY_IF_CACHED`, this will lead to + // an `ad_utility::Exception`. + ProtoResult runComputation(const ad_utility::Timer& timer, + ComputationMode computationMode); + + // Call `runComputation` and transform it into a value that could be inserted + // into the cache. + CacheValue runComputationAndPrepareForCache(const ad_utility::Timer& timer, + ComputationMode computationMode, + const std::string& cacheKey, + bool pinned); + // Create and store the complete runtime information for this operation after // it has either been successfully computed or read from the cache. virtual void updateRuntimeInformationOnSuccess( @@ -272,7 +300,7 @@ class Operation { // allowed when `cacheStatus` is `cachedPinned` or `cachedNotPinned`, // otherwise a runtime check will fail. virtual void updateRuntimeInformationOnSuccess( - const Result& resultTable, ad_utility::CacheStatus cacheStatus, + size_t numRows, ad_utility::CacheStatus cacheStatus, Milliseconds duration, std::optional runtimeInfo) final; @@ -359,4 +387,20 @@ class Operation { // Store the list of columns by which the result is sorted. mutable std::optional> _resultSortedColumns = std::nullopt; + + // True if this operation does not support limits/offsets natively and a + // limit/offset is applied post computation. + bool externalLimitApplied_ = false; + + FRIEND_TEST(Operation, updateRuntimeStatsWorksCorrectly); + FRIEND_TEST(Operation, verifyRuntimeInformationIsUpdatedForLazyOperations); + FRIEND_TEST(Operation, ensureFailedStatusIsSetWhenGeneratorThrowsException); + FRIEND_TEST(Operation, testSubMillisecondsIncrementsAreStillTracked); + FRIEND_TEST(Operation, ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd); + FRIEND_TEST(Operation, + ensureSignalUpdateIsCalledAtTheEndOfPartialConsumption); + FRIEND_TEST(Operation, + verifyLimitIsProperlyAppliedAndUpdatesRuntimeInfoCorrectly); + FRIEND_TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough); + FRIEND_TEST(Operation, checkLazyOperationIsNotCachedIfTooLarge); }; diff --git a/src/engine/QueryExecutionContext.h b/src/engine/QueryExecutionContext.h index e7c1ff37f6..0c17ea684b 100644 --- a/src/engine/QueryExecutionContext.h +++ b/src/engine/QueryExecutionContext.h @@ -22,27 +22,39 @@ class CacheValue { private: - std::shared_ptr _resultTable; - RuntimeInformation _runtimeInfo; + std::shared_ptr result_; + RuntimeInformation runtimeInfo_; public: - explicit CacheValue(Result resultTable, RuntimeInformation runtimeInfo) - : _resultTable(std::make_shared(std::move(resultTable))), - _runtimeInfo(std::move(runtimeInfo)) {} + explicit CacheValue(Result result, RuntimeInformation runtimeInfo) + : result_{std::make_shared(std::move(result))}, + runtimeInfo_{std::move(runtimeInfo)} {} - const std::shared_ptr& resultTable() const { - return _resultTable; + CacheValue(CacheValue&&) = default; + CacheValue(const CacheValue&) = delete; + CacheValue& operator=(CacheValue&&) = default; + CacheValue& operator=(const CacheValue&) = delete; + + const Result& resultTable() const noexcept { return *result_; } + + std::shared_ptr resultTablePtr() const noexcept { + return result_; } - const RuntimeInformation& runtimeInfo() const { return _runtimeInfo; } + const RuntimeInformation& runtimeInfo() const noexcept { + return runtimeInfo_; + } + + static ad_utility::MemorySize getSize(const IdTable& idTable) { + return ad_utility::MemorySize::bytes(idTable.size() * idTable.numColumns() * + sizeof(Id)); + } // Calculates the `MemorySize` taken up by an instance of `CacheValue`. struct SizeGetter { ad_utility::MemorySize operator()(const CacheValue& cacheValue) const { - if (const auto& tablePtr = cacheValue._resultTable; tablePtr) { - return ad_utility::MemorySize::bytes(tablePtr->idTable().size() * - tablePtr->idTable().numColumns() * - sizeof(Id)); + if (const auto& resultPtr = cacheValue.result_; resultPtr) { + return getSize(resultPtr->idTable()); } else { return 0_B; } @@ -87,7 +99,7 @@ class QueryExecutionContext { return _sortPerformanceEstimator; } - [[nodiscard]] double getCostFactor(const string& key) const { + [[nodiscard]] double getCostFactor(const std::string& key) const { return _costFactors.getCostFactor(key); }; diff --git a/src/engine/QueryExecutionTree.cpp b/src/engine/QueryExecutionTree.cpp index 35aa938614..aed58d4fde 100644 --- a/src/engine/QueryExecutionTree.cpp +++ b/src/engine/QueryExecutionTree.cpp @@ -84,6 +84,7 @@ size_t QueryExecutionTree::getCostEstimate() { size_t QueryExecutionTree::getSizeEstimate() { if (!sizeEstimate_.has_value()) { if (cachedResult_) { + AD_CORRECTNESS_CHECK(cachedResult_->isFullyMaterialized()); sizeEstimate_ = cachedResult_->idTable().size(); } else { // if we are in a unit test setting and there is no QueryExecutionContest @@ -98,6 +99,7 @@ size_t QueryExecutionTree::getSizeEstimate() { // _____________________________________________________________________________ bool QueryExecutionTree::knownEmptyResult() { if (cachedResult_) { + AD_CORRECTNESS_CHECK(cachedResult_->isFullyMaterialized()); return cachedResult_->idTable().size() == 0; } return rootOperation_->knownEmptyResult(); @@ -117,7 +119,7 @@ void QueryExecutionTree::readFromCache() { auto& cache = qec_->getQueryTreeCache(); auto res = cache.getIfContained(getCacheKey()); if (res.has_value()) { - cachedResult_ = res->_resultPointer->resultTable(); + cachedResult_ = res->_resultPointer->resultTablePtr(); } } diff --git a/src/engine/Result.cpp b/src/engine/Result.cpp index ab80343d65..f59971225b 100644 --- a/src/engine/Result.cpp +++ b/src/engine/Result.cpp @@ -6,8 +6,12 @@ #include "engine/Result.h" -#include "engine/LocalVocab.h" +#include + +#include "util/CacheableGenerator.h" #include "util/Exception.h" +#include "util/Log.h" +#include "util/Timer.h" // _____________________________________________________________________________ string Result::asDebugString() const { @@ -33,89 +37,268 @@ auto Result::getMergedLocalVocab(const Result& result1, const Result& result2) LocalVocab Result::getCopyOfLocalVocab() const { return localVocab().clone(); } // _____________________________________________________________________________ -Result::Result(IdTable idTable, std::vector sortedBy, - SharedLocalVocabWrapper localVocab) - : idTable_{std::move(idTable)}, - sortedBy_{std::move(sortedBy)}, - localVocab_{std::move(localVocab.localVocab_)} { - AD_CONTRACT_CHECK(localVocab_ != nullptr); - AD_CONTRACT_CHECK(std::ranges::all_of(sortedBy_, [this](size_t numCols) { - return numCols < this->idTable().numColumns(); - })); - - [[maybe_unused]] auto compareRowsByJoinColumns = [this](const auto& row1, - const auto& row2) { - for (size_t col : this->sortedBy()) { +auto compareRowsBySortColumns(const std::vector& sortedBy) { + return [&sortedBy](const auto& row1, const auto& row2) { + for (ColumnIndex col : sortedBy) { if (row1[col] != row2[col]) { return row1[col] < row2[col]; } } return false; }; - AD_EXPENSIVE_CHECK( - std::ranges::is_sorted(this->idTable(), compareRowsByJoinColumns)); +} + +// _____________________________________________________________________________ +Result::Result(IdTable idTable, std::vector sortedBy, + SharedLocalVocabWrapper localVocab) + : data_{std::move(idTable)}, + sortedBy_{std::move(sortedBy)}, + localVocab_{std::move(localVocab.localVocab_)} { + AD_CONTRACT_CHECK(localVocab_ != nullptr); + assertSortOrderIsRespected(this->idTable(), sortedBy_); } // _____________________________________________________________________________ Result::Result(IdTable idTable, std::vector sortedBy, LocalVocab&& localVocab) - : Result(std::move(idTable), std::move(sortedBy), - SharedLocalVocabWrapper{std::move(localVocab)}) {} + : Result{std::move(idTable), std::move(sortedBy), + SharedLocalVocabWrapper{std::move(localVocab)}} {} // _____________________________________________________________________________ -void Result::applyLimitOffset(const LimitOffsetClause& limitOffset) { - // Apply the OFFSET clause. If the offset is `0` or the offset is larger - // than the size of the `IdTable`, then this has no effect and runtime - // `O(1)` (see the docs for `std::shift_left`). +Result::Result(cppcoro::generator idTables, + std::vector sortedBy, + SharedLocalVocabWrapper localVocab) + : data_{GenContainer{ + [](auto idTables, auto sortedBy) -> cppcoro::generator { + std::optional previousId = std::nullopt; + for (IdTable& idTable : idTables) { + if (!idTable.empty()) { + if (previousId.has_value()) { + AD_EXPENSIVE_CHECK(!compareRowsBySortColumns(sortedBy)( + idTable.at(0), previousId.value())); + } + previousId = idTable.at(idTable.size() - 1); + } + assertSortOrderIsRespected(idTable, sortedBy); + co_yield std::move(idTable); + } + }(std::move(idTables), sortedBy)}}, + sortedBy_{std::move(sortedBy)}, + localVocab_{std::move(localVocab.localVocab_)} { + AD_CONTRACT_CHECK(localVocab_ != nullptr); +} + +// _____________________________________________________________________________ +Result::Result(cppcoro::generator idTables, + std::vector sortedBy, LocalVocab&& localVocab) + : Result{std::move(idTables), std::move(sortedBy), + SharedLocalVocabWrapper{std::move(localVocab)}} {} + +// _____________________________________________________________________________ +// Apply `LimitOffsetClause` to given `IdTable`. +void resizeIdTable(IdTable& idTable, const LimitOffsetClause& limitOffset) { std::ranges::for_each( - idTable_.getColumns(), - [offset = limitOffset.actualOffset(idTable_.numRows()), + idTable.getColumns(), + [offset = limitOffset.actualOffset(idTable.numRows()), upperBound = - limitOffset.upperBound(idTable_.numRows())](std::span column) { + limitOffset.upperBound(idTable.numRows())](std::span column) { std::shift_left(column.begin(), column.begin() + upperBound, offset); }); // Resize the `IdTable` if necessary. - size_t targetSize = limitOffset.actualSize(idTable_.numRows()); - AD_CORRECTNESS_CHECK(targetSize <= idTable_.numRows()); - idTable_.resize(targetSize); - idTable_.shrinkToFit(); + size_t targetSize = limitOffset.actualSize(idTable.numRows()); + AD_CORRECTNESS_CHECK(targetSize <= idTable.numRows()); + idTable.resize(targetSize); + idTable.shrinkToFit(); +} + +// _____________________________________________________________________________ +void Result::applyLimitOffset( + const LimitOffsetClause& limitOffset, + std::function + limitTimeCallback) { + // Apply the OFFSET clause. If the offset is `0` or the offset is larger + // than the size of the `IdTable`, then this has no effect and runtime + // `O(1)` (see the docs for `std::shift_left`). + AD_CONTRACT_CHECK(limitTimeCallback); + if (isFullyMaterialized()) { + ad_utility::timer::Timer limitTimer{ad_utility::timer::Timer::Started}; + resizeIdTable(std::get(data_), limitOffset); + limitTimeCallback(limitTimer.msecs(), idTable()); + } else { + auto generator = [](cppcoro::generator original, + LimitOffsetClause limitOffset, + auto limitTimeCallback) -> cppcoro::generator { + if (limitOffset._limit.value_or(1) == 0) { + co_return; + } + for (IdTable& idTable : original) { + ad_utility::timer::Timer limitTimer{ad_utility::timer::Timer::Started}; + size_t originalSize = idTable.numRows(); + resizeIdTable(idTable, limitOffset); + uint64_t offsetDelta = limitOffset.actualOffset(originalSize); + limitOffset._offset -= offsetDelta; + if (limitOffset._limit.has_value()) { + limitOffset._limit.value() -= + limitOffset.actualSize(originalSize - offsetDelta); + } + limitTimeCallback(limitTimer.value(), idTable); + if (limitOffset._offset == 0) { + co_yield idTable; + } + if (limitOffset._limit.value_or(1) == 0) { + break; + } + } + }(std::move(idTables()), limitOffset, std::move(limitTimeCallback)); + data_.emplace(std::move(generator)); + } } // _____________________________________________________________________________ -auto Result::getOrComputeDatatypeCountsPerColumn() - -> const DatatypeCountsPerColumn& { - if (datatypeCountsPerColumn_.has_value()) { - return datatypeCountsPerColumn_.value(); +void Result::assertThatLimitWasRespected(const LimitOffsetClause& limitOffset) { + if (isFullyMaterialized()) { + uint64_t numRows = idTable().numRows(); + auto limit = limitOffset._limit; + AD_CONTRACT_CHECK(!limit.has_value() || numRows <= limit.value()); + } else { + auto generator = + [](cppcoro::generator original, + LimitOffsetClause limitOffset) -> cppcoro::generator { + auto limit = limitOffset._limit; + uint64_t elementCount = 0; + for (IdTable& idTable : original) { + elementCount += idTable.numRows(); + AD_CONTRACT_CHECK(!limit.has_value() || elementCount <= limit.value()); + co_yield idTable; + } + AD_CONTRACT_CHECK(!limit.has_value() || elementCount <= limit.value()); + }(std::move(idTables()), limitOffset); + data_.emplace(std::move(generator)); } - auto& types = datatypeCountsPerColumn_.emplace(); - types.resize(idTable_.numColumns()); - for (size_t i = 0; i < idTable_.numColumns(); ++i) { - const auto& col = idTable_.getColumn(i); - auto& datatypes = types.at(i); - for (Id id : col) { - ++datatypes[static_cast(id.getDatatype())]; - } +} + +// _____________________________________________________________________________ +void Result::checkDefinedness(const VariableToColumnMap& varColMap) { + auto performCheck = [](const auto& map, IdTable& idTable) { + return std::ranges::all_of(map, [&](const auto& varAndCol) { + const auto& [columnIndex, mightContainUndef] = varAndCol.second; + if (mightContainUndef == ColumnIndexAndTypeInfo::AlwaysDefined) { + return std::ranges::all_of(idTable.getColumn(columnIndex), [](Id id) { + return id.getDatatype() != Datatype::Undefined; + }); + } + return true; + }); + }; + if (isFullyMaterialized()) { + AD_EXPENSIVE_CHECK(performCheck(varColMap, std::get(data_))); + } else { + auto generator = [](cppcoro::generator original, + VariableToColumnMap varColMap, + auto performCheck) -> cppcoro::generator { + for (IdTable& idTable : original) { + // No need to check subsequent idTables assuming the datatypes + // don't change mid result. + AD_EXPENSIVE_CHECK(performCheck(varColMap, idTable)); + co_yield idTable; + } + }(std::move(idTables()), varColMap, std::move(performCheck)); + data_.emplace(std::move(generator)); } - return types; } -// _____________________________________________________________ -bool Result::checkDefinedness(const VariableToColumnMap& varColMap) { - const auto& datatypesPerColumn = getOrComputeDatatypeCountsPerColumn(); - return std::ranges::all_of(varColMap, [&](const auto& varAndCol) { - const auto& [columnIndex, mightContainUndef] = varAndCol.second; - bool hasUndefined = datatypesPerColumn.at(columnIndex) - .at(static_cast(Datatype::Undefined)) != 0; - return mightContainUndef == ColumnIndexAndTypeInfo::PossiblyUndefined || - !hasUndefined; - }); +// _____________________________________________________________________________ +void Result::runOnNewChunkComputed( + std::function onNewChunk, + std::function onGeneratorFinished) { + AD_CONTRACT_CHECK(!isFullyMaterialized()); + auto generator = [](cppcoro::generator original, auto onNewChunk, + auto onGeneratorFinished) -> cppcoro::generator { + // Call this within destructor to make sure it is also called when an + // operation stops iterating before reaching the end. + absl::Cleanup cleanup{ + [&onGeneratorFinished]() { onGeneratorFinished(false); }}; + try { + ad_utility::timer::Timer timer{ad_utility::timer::Timer::Started}; + for (IdTable& idTable : original) { + onNewChunk(idTable, timer.value()); + co_yield idTable; + timer.start(); + } + } catch (...) { + std::move(cleanup).Cancel(); + onGeneratorFinished(true); + throw; + } + }(std::move(idTables()), std::move(onNewChunk), + std::move(onGeneratorFinished)); + data_.emplace(std::move(generator)); +} + +// _____________________________________________________________________________ +void Result::assertSortOrderIsRespected( + const IdTable& idTable, const std::vector& sortedBy) { + AD_CONTRACT_CHECK( + std::ranges::all_of(sortedBy, [&idTable](ColumnIndex colIndex) { + return colIndex < idTable.numColumns(); + })); + + AD_EXPENSIVE_CHECK( + std::ranges::is_sorted(idTable, compareRowsBySortColumns(sortedBy))); +} + +// _____________________________________________________________________________ +const IdTable& Result::idTable() const { + AD_CONTRACT_CHECK(isFullyMaterialized()); + return std::get(data_); +} + +// _____________________________________________________________________________ +cppcoro::generator& Result::idTables() const { + AD_CONTRACT_CHECK(!isFullyMaterialized()); + const auto& container = std::get(data_); + AD_CONTRACT_CHECK(!container.consumed_->exchange(true)); + return container.generator_; +} + +// _____________________________________________________________________________ +bool Result::isFullyMaterialized() const noexcept { + return std::holds_alternative(data_); } // _____________________________________________________________________________ -const IdTable& Result::idTable() const { return idTable_; } +void Result::cacheDuringConsumption( + std::function&, const IdTable&)> + fitInCache, + std::function storeInCache) { + AD_CONTRACT_CHECK(!isFullyMaterialized()); + data_.emplace(ad_utility::wrapGeneratorWithCache( + std::move(idTables()), + [fitInCache = std::move(fitInCache)](std::optional& aggregate, + const IdTable& newTable) { + bool doBothFitInCache = fitInCache(aggregate, newTable); + if (doBothFitInCache) { + if (aggregate.has_value()) { + aggregate.value().insertAtEnd(newTable); + } else { + aggregate.emplace(newTable.clone()); + } + } + return doBothFitInCache; + }, + [storeInCache = std::move(storeInCache), sortedBy = sortedBy_, + localVocab = localVocab_](IdTable idTable) mutable { + storeInCache(Result{std::move(idTable), std::move(sortedBy), + SharedLocalVocabWrapper{std::move(localVocab)}}); + })); +} // _____________________________________________________________________________ void Result::logResultSize() const { - LOG(INFO) << "Result has size " << idTable().size() << " x " - << idTable().numColumns() << std::endl; + if (isFullyMaterialized()) { + LOG(INFO) << "Result has size " << idTable().size() << " x " + << idTable().numColumns() << std::endl; + } else { + LOG(INFO) << "Result has unknown size (not computed yet)" << std::endl; + } } diff --git a/src/engine/Result.h b/src/engine/Result.h index 5c9f37c008..2d896e364b 100644 --- a/src/engine/Result.h +++ b/src/engine/Result.h @@ -7,6 +7,7 @@ #pragma once #include +#include #include #include "engine/LocalVocab.h" @@ -17,17 +18,29 @@ // The result of an `Operation`. This is the class QLever uses for all // intermediate or final results when processing a SPARQL query. The actual data -// is always a table and contained in the member `idTable()`. +// is either a table and contained in the member `idTable()` or can be consumed +// through a generator via `idTables()` when it is supposed to be lazily +// evaluated. class Result { private: + // Needs to be mutable in order to be consumable from a const result. + struct GenContainer { + mutable cppcoro::generator generator_; + mutable std::unique_ptr consumed_ = + std::make_unique(false); + explicit GenContainer(cppcoro::generator generator) + : generator_{std::move(generator)} {} + }; + using Data = std::variant; // The actual entries. - IdTable idTable_; + Data data_; // The column indices by which the result is sorted (primary sort key first). // Empty if the result is not sorted on any column. std::vector sortedBy_; using LocalVocabPtr = std::shared_ptr; + // The local vocabulary of the result. LocalVocabPtr localVocab_ = std::make_shared(); @@ -61,12 +74,9 @@ class Result { std::make_shared(std::move(localVocab))} {} }; - // For each column in the result (the entries in the outer `vector`) and for - // each `Datatype` (the entries of the inner `array`), store the information - // how many entries of that datatype are stored in the column. - using DatatypeCountsPerColumn = std::vector< - std::array(Datatype::MaxValue) + 1>>; - std::optional datatypeCountsPerColumn_; + // Check if sort order promised by `sortedBy` is kept within `idTable`. + static void assertSortOrderIsRespected( + const IdTable& idTable, const std::vector& sortedBy); public: // Construct from the given arguments (see above) and check the following @@ -82,7 +92,10 @@ class Result { SharedLocalVocabWrapper localVocab); Result(IdTable idTable, std::vector sortedBy, LocalVocab&& localVocab); - + Result(cppcoro::generator idTables, + std::vector sortedBy, SharedLocalVocabWrapper localVocab); + Result(cppcoro::generator idTables, + std::vector sortedBy, LocalVocab&& localVocab); // Prevent accidental copying of a result table. Result(const Result& other) = delete; Result& operator=(const Result& other) = delete; @@ -91,12 +104,43 @@ class Result { Result(Result&& other) = default; Result& operator=(Result&& other) = default; - // Default destructor. - virtual ~Result() = default; - - // Const access to the underlying `IdTable`. + // Wrap the generator stored in `data_` within a new generator that calls + // `onNewChunk` every time a new `IdTable` is yielded by the original + // generator and passed this new `IdTable` along with microsecond precision + // timing information on how long it took to compute this new chunk. + // `onGeneratorFinished` is guaranteed to be called eventually as long as the + // generator is consumed at least partially, with `true` if an exception + // occurred during consumption or with `false` when the generator is done + // processing or abandoned and destroyed. + // + // Throw an `ad_utility::Exception` if the underlying `data_` member holds the + // wrong variant. + void runOnNewChunkComputed( + std::function onNewChunk, + std::function onGeneratorFinished); + + // Wrap the generator stored in `data_` within a new generator that aggregates + // the entries yielded by the generator into a cacheable `IdTable`. Once + // `fitInCache` returns false, thus indicating that both passed arguments + // together would be too large to be cached, this cached value is discarded. + // If this cached value still exists when the generator is fully consumed a + // new `Result` is created with this value and passed to `storeInCache`. + // + // Throw an `ad_utility::Exception` if the underlying `data_` member holds the + // wrong variant. + void cacheDuringConsumption( + std::function&, const IdTable&)> + fitInCache, + std::function storeInCache); + + // Const access to the underlying `IdTable`. Throw an `ad_utility::Exception` + // if the underlying `data_` member holds the wrong variant. const IdTable& idTable() const; + // Access to the underlying `IdTable`s. Throw an `ad_utility::Exception` + // if the underlying `data_` member holds the wrong variant. + cppcoro::generator& idTables() const; + // Const access to the columns by which the `idTable()` is sorted. const std::vector& sortedBy() const { return sortedBy_; } @@ -140,6 +184,9 @@ class Result { // (which is not possible with `shareLocalVocabFrom`). LocalVocab getCopyOfLocalVocab() const; + // Return true if `data_` holds an `IdTable`, false otherwise. + bool isFullyMaterialized() const noexcept; + // Log the size of this result. We call this at several places in // `Server::processQuery`. Ideally, this should only be called in one // place, but for now, this method at least makes sure that these log @@ -150,24 +197,36 @@ class Result { string asDebugString() const; // Apply the `limitOffset` clause by shifting and then resizing the `IdTable`. - // Note: If additional members and invariants are added to the class (for + // This also applies if `data_` holds a generator yielding `IdTable`s, where + // this is applied respectively. + // `limitTimeCallback` is called whenever an `IdTable` is resized with the + // number of microseconds it took to perform this operation and the freshly + // resized `IdTable` as const reference. + // Note: If additional members and invariants are added to the class (for // example information about the datatypes in each column) make sure that // those are still correct after performing this operation. - void applyLimitOffset(const LimitOffsetClause& limitOffset); - - // Get the information, which columns stores how many entries of each - // datatype. This information is computed on the first call to this function - // `O(num-entries-in-table)` and then cached for subsequent usages. - const DatatypeCountsPerColumn& getOrComputeDatatypeCountsPerColumn(); + void applyLimitOffset( + const LimitOffsetClause& limitOffset, + std::function + limitTimeCallback); + + // Check if the operation did fulfill its contract and only returns as many + // elements as requested by the provided `limitOffset`. Throw an + // `ad_utility::Exception` otherwise. When `data_` holds a generator, this + // behaviour applies analogously when consuming the generator. + // This member function provides an alternative to `applyLimitOffset` that + // resizes the result if the operation doesn't support this on its own. + void assertThatLimitWasRespected(const LimitOffsetClause& limitOffset); // Check that if the `varColMap` guarantees that a column is always defined // (i.e. that is contains no single undefined value) that there are indeed no - // undefined values in the `_idTable` of this result. Return `true` iff the - // check is successful. - bool checkDefinedness(const VariableToColumnMap& varColMap); + // undefined values in the `data_` of this result. Do nothing iff the check is + // successful. Throw an `ad_utility::Exception` otherwise. When `data_` holds + // a generator, this behaviour applies analogously when consuming the + // generator. + void checkDefinedness(const VariableToColumnMap& varColMap); }; -// In the future (as soon as we implement lazy operations) the `ProtoResult` and -// the `Result` will have different implementations. For now we simply use an -// alias to reduce the size of the diff in the PRs for lazy operations. +// Class alias to conceptually differentiate between Results that produce +// values and Results meant to be consumed. using ProtoResult = Result; diff --git a/src/engine/RuntimeInformation.cpp b/src/engine/RuntimeInformation.cpp index fd3218194f..2e9abd05c1 100644 --- a/src/engine/RuntimeInformation.cpp +++ b/src/engine/RuntimeInformation.cpp @@ -36,6 +36,10 @@ std::string indentStr(size_t indent, bool stripped = false) { } return ind; } + +auto toMs(std::chrono::microseconds us) { + return std::chrono::duration_cast(us).count(); +} } // namespace // __________________________________________________________________________ @@ -67,9 +71,9 @@ void RuntimeInformation::writeToStream(std::ostream& out, size_t indent) const { << '\n'; out << indentStr(indent) << "columns: " << absl::StrJoin(columnNames_, ", ") << '\n'; - out << indentStr(indent) << "total_time: " << totalTime_.count() << " ms" + out << indentStr(indent) << "total_time: " << toMs(totalTime_) << " ms" << '\n'; - out << indentStr(indent) << "operation_time: " << getOperationTime().count() + out << indentStr(indent) << "operation_time: " << toMs(getOperationTime()) << " ms" << '\n'; out << indentStr(indent) << "status: " << toString(status_) << '\n'; out << indentStr(indent) @@ -77,11 +81,10 @@ void RuntimeInformation::writeToStream(std::ostream& out, size_t indent) const { if (cacheStatus_ != ad_utility::CacheStatus::computed) { out << indentStr(indent) // TODO use `<< originalTotalTime_` directly - << "original_total_time: " << originalTotalTime_.count() << " ms" - << '\n'; + << "original_total_time: " << toMs(originalTotalTime_) << " ms" << '\n'; out << indentStr(indent) - << "original_operation_time: " << originalOperationTime_.count() - << " ms" << '\n'; + << "original_operation_time: " << toMs(originalOperationTime_) << " ms" + << '\n'; } for (const auto& el : details_.items()) { out << indentStr(indent) << " " << el.key() << ": "; @@ -134,7 +137,7 @@ void RuntimeInformation::setColumnNames(const VariableToColumnMap& columnMap) { } // __________________________________________________________________________ -std::chrono::milliseconds RuntimeInformation::getOperationTime() const { +std::chrono::microseconds RuntimeInformation::getOperationTime() const { if (cacheStatus_ != ad_utility::CacheStatus::computed) { return totalTime_; } else { @@ -145,8 +148,8 @@ std::chrono::milliseconds RuntimeInformation::getOperationTime() const { children_ | std::views::transform(&RuntimeInformation::totalTime_); // Prevent "negative" computation times in case totalTime_ was not // computed for this yet. - return std::max(0ms, totalTime_ - std::reduce(timesOfChildren.begin(), - timesOfChildren.end(), 0ms)); + return std::max(0us, totalTime_ - std::reduce(timesOfChildren.begin(), + timesOfChildren.end(), 0us)); } } @@ -196,10 +199,10 @@ void to_json(nlohmann::ordered_json& j, const RuntimeInformation& rti) { {"result_rows", rti.numRows_}, {"result_cols", rti.numCols_}, {"column_names", rti.columnNames_}, - {"total_time", rti.totalTime_.count()}, - {"operation_time", rti.getOperationTime().count()}, - {"original_total_time", rti.originalTotalTime_.count()}, - {"original_operation_time", rti.originalOperationTime_.count()}, + {"total_time", toMs(rti.totalTime_)}, + {"operation_time", toMs(rti.getOperationTime())}, + {"original_total_time", toMs(rti.originalTotalTime_)}, + {"original_operation_time", toMs(rti.originalOperationTime_)}, {"cache_status", ad_utility::toString(rti.cacheStatus_)}, {"details", rti.details_}, {"estimated_total_cost", rti.costEstimate_}, @@ -219,7 +222,6 @@ void to_json(nlohmann::ordered_json& j, // __________________________________________________________________________ void RuntimeInformation::addLimitOffsetRow(const LimitOffsetClause& l, - Milliseconds timeForLimit, bool fullResultIsNotCached) { bool hasLimit = l._limit.has_value(); bool hasOffset = l._offset != 0; @@ -233,7 +235,6 @@ void RuntimeInformation::addLimitOffsetRow(const LimitOffsetClause& l, numRows_ = l.actualSize(actualOperation->numRows_); details_.clear(); cacheStatus_ = ad_utility::CacheStatus::computed; - totalTime_ += timeForLimit; actualOperation->addDetail("not-written-to-cache-because-child-of-limit", fullResultIsNotCached); actualOperation->eraseDetail("limit"); diff --git a/src/engine/RuntimeInformation.h b/src/engine/RuntimeInformation.h index f791c0cd63..ff4cdc1488 100644 --- a/src/engine/RuntimeInformation.h +++ b/src/engine/RuntimeInformation.h @@ -22,6 +22,7 @@ /// time to compute, status, etc.). Also contains the functionality to print /// that information nicely formatted and to export it to JSON. class RuntimeInformation { + using Microseconds = std::chrono::microseconds; using Milliseconds = std::chrono::milliseconds; public: @@ -47,12 +48,12 @@ class RuntimeInformation { /// The total time spent computing this operation. This includes the /// computation of the children. - Milliseconds totalTime_ = ZERO; + Microseconds totalTime_ = ZERO; /// In case this operation was read from the cache, we will store the time /// information about the original computation in the following two members. - Milliseconds originalTotalTime_ = ZERO; - Milliseconds originalOperationTime_ = ZERO; + Microseconds originalTotalTime_ = ZERO; + Microseconds originalOperationTime_ = ZERO; /// The estimated cost, size, and column multiplicities of the operation. size_t costEstimate_ = 0; @@ -100,7 +101,7 @@ class RuntimeInformation { /// Get the time spent computing the operation. This is the total time minus /// the time spent computing the children, but always positive. - [[nodiscard]] Milliseconds getOperationTime() const; + [[nodiscard]] Microseconds getOperationTime() const; /// Get the cost estimate for this operation. This is the total cost estimate /// minus the sum of the cost estimates of all children. @@ -129,10 +130,10 @@ class RuntimeInformation { // Set the runtime information for a LIMIT or OFFSET operation as the new root // of the tree and make the old root the only child of the LIMIT operation. - // The details of the LIMIT/OFFSET, the time (in ms) that was spent computing - // it, and the information whether the `actual` operation (the old root of the - // runtime information) is written to the cache, are passed in as arguments. - void addLimitOffsetRow(const LimitOffsetClause& l, Milliseconds timeForLimit, + // The details of the LIMIT/OFFSET and the information whether the `actual` + // operation (the old root of the runtime information) is written to the + // cache, are passed in as arguments. + void addLimitOffsetRow(const LimitOffsetClause& l, bool fullResultIsNotCached); static std::string_view toString(Status status); diff --git a/src/index/CompressedRelation.cpp b/src/index/CompressedRelation.cpp index d3b319ce7f..0d8b779313 100644 --- a/src/index/CompressedRelation.cpp +++ b/src/index/CompressedRelation.cpp @@ -395,11 +395,12 @@ DecompressedBlock CompressedRelationReader::readPossiblyIncompleteBlock( auto cacheKey = blockMetadata.offsetsAndCompressedSize_.at(0).offsetInFile_; auto sharedResultFromCache = blockCache_ - .computeOnce(cacheKey, - [&]() { - return readAndDecompressBlock(blockMetadata, - allColumns); - }) + .computeOnce( + cacheKey, + [&]() { + return readAndDecompressBlock(blockMetadata, allColumns); + }, + false, [](const auto&) { return true; }) ._resultPointer; const DecompressedBlock& block = *sharedResultFromCache; diff --git a/src/util/Cache.h b/src/util/Cache.h index 1c7eb5923c..23750eea16 100644 --- a/src/util/Cache.h +++ b/src/util/Cache.h @@ -6,26 +6,21 @@ #pragma once -#include - +#include #include #include #include -#include #include #include -#include "./HashMap.h" -#include "PriorityQueue.h" -#include "util/ConstexprUtils.h" +#include "util/HashMap.h" #include "util/MemorySize/MemorySize.h" +#include "util/PriorityQueue.h" #include "util/TypeTraits.h" #include "util/ValueSizeGetters.h" namespace ad_utility { -using std::make_shared; -using std::pair; using std::shared_ptr; using namespace ad_utility::memory_literals; @@ -95,13 +90,13 @@ class FlexibleCache { }; using EmplacedValue = shared_ptr; - // using Entry = pair; using EntryList = PriorityQueue; using AccessMap = MapType; using PinnedMap = MapType; + using SizeMap = MapType; - using TryEmplaceResult = pair; + using TryEmplaceResult = std::pair; public: //! Typical constructor. A default value may be added in time. @@ -143,7 +138,7 @@ class FlexibleCache { /// Insert a key-value pair to the cache. Throws an exception if the key is /// already present. If the value is too big for the cache, nothing happens. ValuePtr insert(const Key& key, Value value) { - auto ptr = make_shared(std::move(value)); + auto ptr = std::make_shared(std::move(value)); return insert(key, std::move(ptr)); } @@ -151,7 +146,7 @@ class FlexibleCache { // is already present. If the value is too big for the cache, an exception is // thrown. ValuePtr insertPinned(const Key& key, Value value) { - auto ptr = make_shared(std::move(value)); + auto ptr = std::make_shared(std::move(value)); return insertPinned(key, std::move(ptr)); } @@ -227,6 +222,10 @@ class FlexibleCache { // TODO:: implement this functionality } + MemorySize getMaxSizeSingleEntry() const noexcept { + return _maxSizeSingleEntry; + } + //! Checks if there is an entry with the given key. bool contains(const Key& key) const { return containsPinned(key) || containsNonPinned(key); diff --git a/src/util/CacheableGenerator.h b/src/util/CacheableGenerator.h new file mode 100644 index 0000000000..881dd1c282 --- /dev/null +++ b/src/util/CacheableGenerator.h @@ -0,0 +1,40 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#pragma once + +#include + +#include "util/Generator.h" +#include "util/TypeTraits.h" + +namespace ad_utility { + +// Wrap the given `generator` inside another generator that aggregates a cache +// by calling `aggregator` on every iteration of the inner `generator` until it +// returns false. If the `aggregator` returns false, the cached value is +// discarded. If the cached value is still present once the generator is fully +// consumed, `onFullyCached` is called with the cached value. +template +cppcoro::generator wrapGeneratorWithCache( + cppcoro::generator generator, + InvocableWithExactReturnType&, const T&> auto + aggregator, + InvocableWithExactReturnType auto onFullyCached) { + std::optional aggregatedData{}; + bool shouldBeAggregated = true; + for (T& element : generator) { + if (shouldBeAggregated) { + shouldBeAggregated = aggregator(aggregatedData, element); + if (!shouldBeAggregated) { + aggregatedData.reset(); + } + } + co_yield element; + } + if (aggregatedData.has_value()) { + onFullyCached(std::move(aggregatedData).value()); + } +} +}; // namespace ad_utility diff --git a/src/util/ConcurrentCache.h b/src/util/ConcurrentCache.h index de8d0f270e..2f22efde8c 100644 --- a/src/util/ConcurrentCache.h +++ b/src/util/ConcurrentCache.h @@ -18,7 +18,6 @@ namespace ad_utility { -using std::make_shared; using std::shared_ptr; /** This exception is thrown if we are waiting for a computation result, @@ -38,6 +37,8 @@ class WaitedForResultWhichThenFailedException : public std::exception { enum struct CacheStatus { cachedNotPinned, cachedPinned, + // TODO Rename to notCached, the name is just confusing. Can + // potentially be merged with notInCacheAndNotComputed. computed, notInCacheAndNotComputed }; @@ -99,10 +100,11 @@ class ResultInProgress { // have called or will call getResult(). Check that none of the other threads // waiting for the result have already finished or were aborted. void finish(shared_ptr result) { - std::lock_guard lockGuard(_mutex); + std::unique_lock lockGuard(_mutex); AD_CONTRACT_CHECK(_status == Status::IN_PROGRESS); _status = Status::FINISHED; _result = std::move(result); + lockGuard.unlock(); _conditionVariable.notify_all(); } @@ -112,8 +114,9 @@ class ResultInProgress { // will terminate. void abort() { AD_CONTRACT_CHECK(_status == Status::IN_PROGRESS); - std::lock_guard lockGuard(_mutex); + std::unique_lock lockGuard(_mutex); _status = Status::ABORTED; + lockGuard.unlock(); _conditionVariable.notify_all(); } @@ -124,7 +127,7 @@ class ResultInProgress { std::unique_lock uniqueLock(_mutex); _conditionVariable.wait(uniqueLock, [this] { return _status != Status::IN_PROGRESS; }); - if (_status == ResultInProgress::Status::ABORTED) { + if (_status == Status::ABORTED) { throw WaitedForResultWhichThenFailedException{}; } return _result; @@ -170,8 +173,6 @@ class ConcurrentCache { /** * @brief Obtain the result of an expensive computation. Do not recompute the * result if it is cached or currently being computed by another thread. - * @tparam ComputeFunction A callable whose operator() takes no argument and - * produces the computation result. * @param key A key that can uniquely identify a computation. For equal keys, * the associated computeFunctions must yield the same results. * @param computeFunction The actual computation. If the result has to be @@ -179,25 +180,48 @@ class ConcurrentCache { * @param onlyReadFromCache If true, then the result will only be returned if * it is contained in the cache. Otherwise `nullptr` with a cache status of * `notInCacheNotComputed` will be returned. - * @return A shared_ptr to the computation result. + * @param suitableForCache Predicate function that will be applied to newly + * computed value to check if it is suitable for caching. Only if it returns + * true the result will be cached. + * @return A `ResultAndCacheStatus` shared_ptr to the computation result. * */ - template - ResultAndCacheStatus computeOnce(const Key& key, - ComputeFunction computeFunction, - bool onlyReadFromCache = false) { - return computeOnceImpl(false, key, std::move(computeFunction), - onlyReadFromCache); + ResultAndCacheStatus computeOnce( + const Key& key, + const InvocableWithConvertibleReturnType auto& computeFunction, + bool onlyReadFromCache, + const InvocableWithConvertibleReturnType auto& + suitableForCache) { + return computeOnceImpl(false, key, computeFunction, onlyReadFromCache, + suitableForCache); } /// Similar to computeOnce, with the following addition: After the call /// completes, the result will be pinned in the underlying cache. - template - ResultAndCacheStatus computeOncePinned(const Key& key, - ComputeFunction computeFunction, - bool onlyReadFromCache = false) { - return computeOnceImpl(true, key, std::move(computeFunction), - onlyReadFromCache); + ResultAndCacheStatus computeOncePinned( + const Key& key, + const InvocableWithConvertibleReturnType auto& computeFunction, + bool onlyReadFromCache, + const InvocableWithConvertibleReturnType auto& + suitedForCache) { + return computeOnceImpl(true, key, computeFunction, onlyReadFromCache, + suitedForCache); + } + + // Insert `value` into the cache, if the `key` is not already present. In case + // `pinned` is true and the key is already present, the existing value is + // pinned in case it is not pinned yet. + void tryInsertIfNotPresent(bool pinned, const Key& key, + std::shared_ptr value) { + auto lockPtr = _cacheAndInProgressMap.wlock(); + auto& cache = lockPtr->_cache; + if (pinned) { + if (!cache.containsAndMakePinnedIfExists(key)) { + cache.insertPinned(key, std::move(value)); + } + } else if (!cache.contains(key)) { + cache.insert(key, std::move(value)); + } } /// Clear the cache (but not the pinned entries) @@ -270,6 +294,10 @@ class ConcurrentCache { _cacheAndInProgressMap.wlock()->_cache.setMaxSizeSingleEntry(maxSize); } + MemorySize getMaxSizeSingleEntry() const { + return _cacheAndInProgressMap.wlock()->_cache.getMaxSizeSingleEntry(); + } + private: using ResultInProgress = ConcurrentCacheDetail::ResultInProgress; @@ -311,10 +339,13 @@ class ConcurrentCache { private: // implementation for computeOnce (pinned and normal variant). - template - ResultAndCacheStatus computeOnceImpl(bool pinned, const Key& key, - ComputeFunction computeFunction, - bool onlyReadFromCache) { + ResultAndCacheStatus computeOnceImpl( + bool pinned, const Key& key, + const InvocableWithConvertibleReturnType auto& computeFunction, + bool onlyReadFromCache, + const InvocableWithConvertibleReturnType auto& + suitableForCache) { + using std::make_shared; bool mustCompute; shared_ptr resultInProgress; // first determine whether we have to compute the result, @@ -356,9 +387,15 @@ class ConcurrentCache { try { // The actual computation shared_ptr result = make_shared(computeFunction()); - moveFromInProgressToCache(key, result); - // Signal other threads who are waiting for the results. - resultInProgress->finish(result); + if (suitableForCache(*result)) { + moveFromInProgressToCache(key, result); + // Signal other threads who are waiting for the results. + resultInProgress->finish(result); + } else { + AD_CONTRACT_CHECK(!pinned); + _cacheAndInProgressMap.wlock()->_inProgress.erase(key); + resultInProgress->finish(nullptr); + } // result was not cached return {std::move(result), CacheStatus::computed}; } catch (...) { @@ -372,7 +409,18 @@ class ConcurrentCache { // someone else is computing the result, wait till it is finished and // return the result, we do not count this case as "cached" as we had to // wait. - return {resultInProgress->getResult(), CacheStatus::computed}; + auto resultPointer = resultInProgress->getResult(); + if (!resultPointer) { + // Fallback computation + auto mutablePointer = make_shared(computeFunction()); + if (suitableForCache(*mutablePointer)) { + tryInsertIfNotPresent(pinned, key, mutablePointer); + } else { + AD_CONTRACT_CHECK(!pinned); + } + resultPointer = std::move(mutablePointer); + } + return {std::move(resultPointer), CacheStatus::computed}; } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d1c3b18e0c..c75ffcd075 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -394,3 +394,9 @@ addLinkAndDiscoverTest(ChunkedForLoopTest) addLinkAndDiscoverTest(FsstCompressorTest fsst) addLinkAndDiscoverTest(CopyableSynchronizationTest) + +addLinkAndDiscoverTest(CacheableGeneratorTest) + +addLinkAndDiscoverTest(FilterTest engine) + +addLinkAndDiscoverTest(ResultTest engine) diff --git a/test/CacheableGeneratorTest.cpp b/test/CacheableGeneratorTest.cpp new file mode 100644 index 0000000000..a96a064429 --- /dev/null +++ b/test/CacheableGeneratorTest.cpp @@ -0,0 +1,83 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#include + +#include "util/CacheableGenerator.h" +#include "util/Generator.h" + +using ad_utility::wrapGeneratorWithCache; +using cppcoro::generator; +using ::testing::Optional; + +generator testGenerator(uint32_t range) { + for (uint32_t i = 0; i < range; i++) { + co_yield i; + } +} + +// _____________________________________________________________________________ +TEST(CacheableGenerator, testAggregation) { + bool called = false; + auto gen = wrapGeneratorWithCache( + testGenerator(4), + [](std::optional& optionalValue, const uint32_t& newValue) { + if (optionalValue.has_value()) { + optionalValue.value() += newValue; + } else { + optionalValue.emplace(newValue); + } + return true; + }, + [&called](std::optional value) { + called = true; + EXPECT_THAT(value, Optional(6)); + }); + uint32_t counter = 0; + for (uint32_t element : gen) { + EXPECT_EQ(counter, element); + ++counter; + } + EXPECT_EQ(counter, 4); + EXPECT_TRUE(called); +} + +// _____________________________________________________________________________ +TEST(CacheableGenerator, testEmptyGenerator) { + bool called = false; + auto gen = wrapGeneratorWithCache( + testGenerator(0), + [&called](std::optional&, uint32_t) { + called = true; + return true; + }, + [&called](std::optional) { called = true; }); + uint32_t tracker = 0; + for (uint32_t element : gen) { + tracker += element; + } + EXPECT_EQ(tracker, 0); + EXPECT_FALSE(called); +} + +// _____________________________________________________________________________ +TEST(CacheableGenerator, testAggregationCutoff) { + uint32_t callCounter = 0; + bool called = false; + auto gen = wrapGeneratorWithCache( + testGenerator(2), + [&callCounter](std::optional&, uint32_t) { + ++callCounter; + return false; + }, + [&called](std::optional) { called = true; }); + uint32_t loopCounter = 0; + for (uint32_t element : gen) { + EXPECT_EQ(element, loopCounter); + ++loopCounter; + } + EXPECT_EQ(loopCounter, 2); + EXPECT_EQ(callCounter, 1); + EXPECT_FALSE(called); +} diff --git a/test/ConcurrentCacheTest.cpp b/test/ConcurrentCacheTest.cpp index 1ab691d610..a0e092a464 100644 --- a/test/ConcurrentCacheTest.cpp +++ b/test/ConcurrentCacheTest.cpp @@ -2,7 +2,8 @@ // Chair of Algorithms and Data Structures. // Author: Johannes Kalmbach (kalmbacj@informatik.uni-freiburg.de) -#include +#include +#include #include #include @@ -13,10 +14,14 @@ #include "util/Cache.h" #include "util/ConcurrentCache.h" #include "util/DefaultValueSizeGetter.h" +#include "util/GTestHelpers.h" #include "util/Timer.h" +#include "util/jthread.h" using namespace std::literals; using namespace std::chrono_literals; +using namespace ad_utility::memory_literals; +using ::testing::Pointee; class ConcurrentSignal { std::atomic_flag flag_; @@ -73,12 +78,17 @@ using SimpleConcurrentLruCache = ad_utility::ConcurrentCache>>; +namespace { +auto returnTrue = [](const auto&) { return true; }; +} // namespace + +// _____________________________________________________________________________ TEST(ConcurrentCache, sequentialComputation) { SimpleConcurrentLruCache a{3ul}; ad_utility::Timer t{ad_utility::Timer::Started}; // Fake computation that takes 5ms and returns value "3", which is then // stored under key 3. - auto result = a.computeOnce(3, waiting_function("3"s, 5)); + auto result = a.computeOnce(3, waiting_function("3"s, 5), false, returnTrue); ASSERT_EQ("3"s, *result._resultPointer); ASSERT_EQ(result._cacheStatus, ad_utility::CacheStatus::computed); ASSERT_GE(t.msecs(), 5ms); @@ -90,7 +100,7 @@ TEST(ConcurrentCache, sequentialComputation) { t.reset(); t.start(); // takes 0 msecs to compute, as the request is served from the cache. - auto result2 = a.computeOnce(3, waiting_function("3"s, 5)); + auto result2 = a.computeOnce(3, waiting_function("3"s, 5), false, returnTrue); // computing result again: still yields "3", was cached and takes 0 // milliseconds (result is read from cache) ASSERT_EQ("3"s, *result2._resultPointer); @@ -107,7 +117,8 @@ TEST(ConcurrentCache, sequentialPinnedComputation) { ad_utility::Timer t{ad_utility::Timer::Started}; // Fake computation that takes 5ms and returns value "3", which is then // stored under key 3. - auto result = a.computeOncePinned(3, waiting_function("3"s, 5)); + auto result = + a.computeOncePinned(3, waiting_function("3"s, 5), false, returnTrue); ASSERT_EQ("3"s, *result._resultPointer); ASSERT_EQ(result._cacheStatus, ad_utility::CacheStatus::computed); ASSERT_GE(t.msecs(), 5ms); @@ -120,7 +131,7 @@ TEST(ConcurrentCache, sequentialPinnedComputation) { t.start(); // takes 0 msecs to compute, as the request is served from the cache. // we don't request a pin, but the original computation was pinned - auto result2 = a.computeOnce(3, waiting_function("3"s, 5)); + auto result2 = a.computeOnce(3, waiting_function("3"s, 5), false, returnTrue); // computing result again: still yields "3", was cached and takes 0 // milliseconds (result is read from cache) ASSERT_EQ("3"s, *result2._resultPointer); @@ -137,7 +148,7 @@ TEST(ConcurrentCache, sequentialPinnedUpgradeComputation) { ad_utility::Timer t{ad_utility::Timer::Started}; // Fake computation that takes 5ms and returns value "3", which is then // stored under key 3. - auto result = a.computeOnce(3, waiting_function("3"s, 5)); + auto result = a.computeOnce(3, waiting_function("3"s, 5), false, returnTrue); ASSERT_EQ("3"s, *result._resultPointer); ASSERT_EQ(result._cacheStatus, ad_utility::CacheStatus::computed); ASSERT_GE(t.msecs(), 5ms); @@ -151,7 +162,8 @@ TEST(ConcurrentCache, sequentialPinnedUpgradeComputation) { // takes 0 msecs to compute, as the request is served from the cache. // request a pin, the result should be read from the cache and upgraded // to a pinned result. - auto result2 = a.computeOncePinned(3, waiting_function("3"s, 5)); + auto result2 = + a.computeOncePinned(3, waiting_function("3"s, 5), false, returnTrue); // computing result again: still yields "3", was cached and takes 0 // milliseconds (result is read from cache) ASSERT_EQ("3"s, *result2._resultPointer); @@ -167,7 +179,8 @@ TEST(ConcurrentCache, concurrentComputation) { auto a = SimpleConcurrentLruCache(3ul); StartStopSignal signal; auto compute = [&a, &signal]() { - return a.computeOnce(3, waiting_function("3"s, 5, &signal)); + return a.computeOnce(3, waiting_function("3"s, 5, &signal), false, + returnTrue); }; auto resultFuture = std::async(std::launch::async, compute); signal.hasStartedSignal_.wait(); @@ -195,7 +208,8 @@ TEST(ConcurrentCache, concurrentPinnedComputation) { auto a = SimpleConcurrentLruCache(3ul); StartStopSignal signal; auto compute = [&a, &signal]() { - return a.computeOncePinned(3, waiting_function("3"s, 5, &signal)); + return a.computeOncePinned(3, waiting_function("3"s, 5, &signal), false, + returnTrue); }; auto resultFuture = std::async(std::launch::async, compute); signal.hasStartedSignal_.wait(); @@ -225,7 +239,8 @@ TEST(ConcurrentCache, concurrentPinnedUpgradeComputation) { auto a = SimpleConcurrentLruCache(3ul); StartStopSignal signal; auto compute = [&a, &signal]() { - return a.computeOnce(3, waiting_function("3"s, 5, &signal)); + return a.computeOnce(3, waiting_function("3"s, 5, &signal), false, + returnTrue); }; auto resultFuture = std::async(std::launch::async, compute); signal.hasStartedSignal_.wait(); @@ -240,7 +255,8 @@ TEST(ConcurrentCache, concurrentPinnedUpgradeComputation) { // this call waits for the background task to compute, and then fetches the // result. After this call completes, nothing is in progress and the result // is cached. - auto result = a.computeOncePinned(3, waiting_function("3"s, 5)); + auto result = + a.computeOncePinned(3, waiting_function("3"s, 5), false, returnTrue); ASSERT_EQ(0ul, a.numNonPinnedEntries()); ASSERT_EQ(1ul, a.numPinnedEntries()); ASSERT_TRUE(a.getStorage().wlock()->_inProgress.empty()); @@ -255,10 +271,12 @@ TEST(ConcurrentCache, abort) { auto a = SimpleConcurrentLruCache(3ul); StartStopSignal signal; auto compute = [&a, &signal]() { - return a.computeOnce(3, waiting_function("3"s, 5, &signal)); + return a.computeOnce(3, waiting_function("3"s, 5, &signal), false, + returnTrue); }; auto computeWithError = [&a, &signal]() { - return a.computeOnce(3, wait_and_throw_function(5, &signal)); + return a.computeOnce(3, wait_and_throw_function(5, &signal), false, + returnTrue); }; auto fut = std::async(std::launch::async, computeWithError); signal.hasStartedSignal_.wait(); @@ -279,10 +297,12 @@ TEST(ConcurrentCache, abortPinned) { auto a = SimpleConcurrentLruCache(3ul); StartStopSignal signal; auto compute = [&]() { - return a.computeOncePinned(3, waiting_function("3"s, 5, &signal)); + return a.computeOncePinned(3, waiting_function("3"s, 5, &signal), false, + returnTrue); }; auto computeWithError = [&a, &signal]() { - return a.computeOncePinned(3, wait_and_throw_function(5, &signal)); + return a.computeOncePinned(3, wait_and_throw_function(5, &signal), false, + returnTrue); }; auto fut = std::async(std::launch::async, computeWithError); signal.hasStartedSignal_.wait(); @@ -309,3 +329,193 @@ TEST(ConcurrentCache, cacheStatusToString) { static_cast(notInCacheAndNotComputed) + 1); EXPECT_ANY_THROW(toString(outOfBounds)); } + +// _____________________________________________________________________________ +TEST(ConcurrentCache, isNotCachedIfUnsuitable) { + SimpleConcurrentLruCache cache{}; + + cache.clearAll(); + + auto result = cache.computeOnce( + 0, []() { return "abc"; }, false, [](const auto&) { return false; }); + + EXPECT_EQ(cache.numNonPinnedEntries(), 0); + EXPECT_EQ(cache.numPinnedEntries(), 0); + EXPECT_THAT(result._resultPointer, Pointee("abc"s)); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, isNotCachedIfUnsuitableWhenWaitingForPendingComputation) { + SimpleConcurrentLruCache cache{}; + + auto resultInProgress = std::make_shared< + ad_utility::ConcurrentCacheDetail::ResultInProgress>(); + + cache.clearAll(); + cache.getStorage().wlock()->_inProgress[0] = + std::pair(false, resultInProgress); + + std::atomic_bool finished = false; + + ad_utility::JThread thread{[&]() { + std::this_thread::sleep_for(5ms); + resultInProgress->finish(nullptr); + finished = true; + }}; + + auto result = cache.computeOnce( + 0, []() { return "abc"; }, false, [](const auto&) { return false; }); + + EXPECT_TRUE(finished); + EXPECT_EQ(cache.numNonPinnedEntries(), 0); + EXPECT_EQ(cache.numPinnedEntries(), 0); + EXPECT_THAT(result._resultPointer, Pointee("abc"s)); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, isCachedIfSuitableWhenWaitingForPendingComputation) { + SimpleConcurrentLruCache cache{}; + + auto resultInProgress = std::make_shared< + ad_utility::ConcurrentCacheDetail::ResultInProgress>(); + + cache.clearAll(); + cache.getStorage().wlock()->_inProgress[0] = + std::pair(false, resultInProgress); + + std::atomic_bool finished = false; + + ad_utility::JThread thread{[&]() { + std::this_thread::sleep_for(5ms); + resultInProgress->finish(nullptr); + finished = true; + }}; + + auto result = cache.computeOnce( + 0, []() { return "abc"; }, false, [](const auto&) { return true; }); + + EXPECT_TRUE(finished); + EXPECT_EQ(cache.numNonPinnedEntries(), 1); + EXPECT_EQ(cache.numPinnedEntries(), 0); + EXPECT_THAT(result._resultPointer, Pointee("abc"s)); + EXPECT_EQ(result._cacheStatus, ad_utility::CacheStatus::computed); + EXPECT_TRUE(cache.cacheContains(0)); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, + isCachedIfSuitableWhenWaitingForPendingComputationPinned) { + SimpleConcurrentLruCache cache{}; + + // Simulate a computation with the same cache key that is currently in + // progress so the new computation waits for the result. + auto resultInProgress = std::make_shared< + ad_utility::ConcurrentCacheDetail::ResultInProgress>(); + + cache.clearAll(); + cache.getStorage().wlock()->_inProgress[0] = + std::pair(false, resultInProgress); + + std::atomic_bool finished = false; + + ad_utility::JThread thread{[&]() { + std::this_thread::sleep_for(5ms); + resultInProgress->finish(nullptr); + finished = true; + }}; + + auto result = cache.computeOncePinned( + 0, []() { return "abc"; }, false, [](const auto&) { return true; }); + + EXPECT_TRUE(finished); + EXPECT_EQ(cache.numNonPinnedEntries(), 0); + EXPECT_EQ(cache.numPinnedEntries(), 1); + EXPECT_THAT(result._resultPointer, Pointee("abc"s)); + EXPECT_TRUE(cache.cacheContains(0)); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, ifUnsuitableForCacheAndPinnedThrowsException) { + SimpleConcurrentLruCache cache{}; + + cache.clearAll(); + + EXPECT_THROW( + cache.computeOncePinned( + 0, []() { return "abc"; }, false, [](const auto&) { return false; }), + ad_utility::Exception); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, + ifUnsuitableWhenWaitingForPendingComputationAndPinnedThrowsException) { + SimpleConcurrentLruCache cache{}; + + auto resultInProgress = std::make_shared< + ad_utility::ConcurrentCacheDetail::ResultInProgress>(); + + cache.clearAll(); + cache.getStorage().wlock()->_inProgress[0] = + std::pair(false, resultInProgress); + + std::atomic_bool finished = false; + + ad_utility::JThread thread{[&]() { + std::this_thread::sleep_for(5ms); + resultInProgress->finish(nullptr); + finished = true; + }}; + + EXPECT_THROW( + cache.computeOncePinned( + 0, []() { return "abc"; }, false, [](const auto&) { return false; }), + ad_utility::Exception); + EXPECT_TRUE(finished); +} + +// _____________________________________________________________________________ +TEST(ConcurrentCache, testTryInsertIfNotPresentDoesWorkCorrectly) { + auto hasValue = [](std::string value) { + using namespace ::testing; + using CS = SimpleConcurrentLruCache::ResultAndCacheStatus; + return Optional( + Field("_resultPointer", &CS::_resultPointer, Pointee(value))); + }; + + SimpleConcurrentLruCache cache{}; + + auto expectContainsSingleElementAtKey0 = + [&](bool pinned, std::string expected, + ad_utility::source_location l = + ad_utility::source_location::current()) { + using namespace ::testing; + auto trace = generateLocationTrace(l); + auto value = cache.getIfContained(0); + EXPECT_THAT(value, hasValue(expected)); + if (pinned) { + EXPECT_NE(cache.pinnedSize(), 0_B); + EXPECT_EQ(cache.nonPinnedSize(), 0_B); + } else { + EXPECT_EQ(cache.pinnedSize(), 0_B); + EXPECT_NE(cache.nonPinnedSize(), 0_B); + } + }; + + cache.tryInsertIfNotPresent(false, 0, std::make_shared("abc")); + + expectContainsSingleElementAtKey0(false, "abc"); + + cache.tryInsertIfNotPresent(false, 0, std::make_shared("def")); + + expectContainsSingleElementAtKey0(false, "abc"); + + cache.tryInsertIfNotPresent(true, 0, std::make_shared("ghi")); + + expectContainsSingleElementAtKey0(true, "abc"); + + cache.clearAll(); + + cache.tryInsertIfNotPresent(true, 0, std::make_shared("jkl")); + + expectContainsSingleElementAtKey0(true, "jkl"); +} diff --git a/test/ExportQueryExecutionTreesTest.cpp b/test/ExportQueryExecutionTreesTest.cpp index 2faf90aeab..76f8ee9a79 100644 --- a/test/ExportQueryExecutionTreesTest.cpp +++ b/test/ExportQueryExecutionTreesTest.cpp @@ -9,10 +9,13 @@ #include "engine/QueryPlanner.h" #include "parser/SparqlParser.h" #include "util/GTestHelpers.h" +#include "util/IdTableHelpers.h" #include "util/IdTestHelpers.h" #include "util/IndexTestHelpers.h" using namespace std::string_literals; +using ::testing::ElementsAre; +using ::testing::Eq; using ::testing::HasSubstr; // Run the given SPARQL `query` on the given Turtle `kg` and export the result @@ -209,6 +212,28 @@ static std::string makeXMLHeader( // The end of a SPARQL XML export. static const std::string xmlTrailer = "\n\n"; +// Helper function for easier testing of the `IdTable` generator. +std::vector convertToVector( + cppcoro::generator generator) { + std::vector result; + for (const IdTable& idTable : generator) { + result.push_back(idTable.clone()); + } + return result; +} + +// Template is only required because inner class is not visible +template +std::vector convertToVector(cppcoro::generator generator) { + std::vector result; + for (const auto& [idTable, range] : generator) { + result.emplace_back(idTable.numColumns(), idTable.getAllocator()); + result.back().insertAtEnd(idTable.begin() + *range.begin(), + idTable.begin() + *(range.end() - 1) + 1); + } + return result; +} + // ____________________________________________________________________________ TEST(ExportQueryExecutionTrees, Integers) { std::string kg = @@ -1069,3 +1094,175 @@ INSTANTIATE_TEST_SUITE_P(StreamableMediaTypes, StreamableMediaTypesFixture, // TODO Unit tests that also test for the export of text records from // the text index and thus systematically fill the coverage gaps. + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, getIdTablesReturnsSingletonIterator) { + auto idTable = makeIdTableFromVector({{42}, {1337}}); + + Result result{idTable.clone(), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getIdTables(result); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(idTable)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, getIdTablesMirrorsGenerator) { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + IdTable idTable2 = makeIdTableFromVector({{42}, {1337}}); + auto tableGenerator = [](IdTable idTableA, + IdTable idTableB) -> cppcoro::generator { + co_yield idTableA; + + co_yield idTableB; + }(idTable1.clone(), idTable2.clone()); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getIdTables(result); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(idTable1)), Eq(std::cref(idTable2)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, ensureCorrectSlicingOfSingleIdTable) { + auto tableGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + co_yield idTable1; + }(); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 1, ._offset = 1}, result); + + auto referenceTable = makeIdTableFromVector({{2}}); + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstIsSkipped) { + auto tableGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + co_yield idTable1; + + IdTable idTable2 = makeIdTableFromVector({{4}, {5}}); + co_yield idTable2; + }(); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = std::nullopt, ._offset = 3}, result); + + auto referenceTable1 = makeIdTableFromVector({{4}, {5}}); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable1)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenLastIsSkipped) { + auto tableGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + co_yield idTable1; + + IdTable idTable2 = makeIdTableFromVector({{4}, {5}}); + co_yield idTable2; + }(); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 3}, result); + + auto referenceTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable1)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstAndSecondArePartial) { + auto tableGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + co_yield idTable1; + + IdTable idTable2 = makeIdTableFromVector({{4}, {5}}); + co_yield idTable2; + }(); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 3, ._offset = 1}, result); + + auto referenceTable1 = makeIdTableFromVector({{2}, {3}}); + auto referenceTable2 = makeIdTableFromVector({{4}}); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable1)), + Eq(std::cref(referenceTable2)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, + ensureCorrectSlicingOfIdTablesWhenFirstAndLastArePartial) { + auto tableGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}}); + co_yield idTable1; + + IdTable idTable2 = makeIdTableFromVector({{4}, {5}}); + co_yield idTable2; + + IdTable idTable3 = makeIdTableFromVector({{6}, {7}, {8}, {9}}); + co_yield idTable3; + }(); + + Result result{std::move(tableGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 5, ._offset = 2}, result); + + auto referenceTable1 = makeIdTableFromVector({{3}}); + auto referenceTable2 = makeIdTableFromVector({{4}, {5}}); + auto referenceTable3 = makeIdTableFromVector({{6}, {7}}); + + EXPECT_THAT(convertToVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable1)), + Eq(std::cref(referenceTable2)), + Eq(std::cref(referenceTable3)))); +} + +// _____________________________________________________________________________ +TEST(ExportQueryExecutionTrees, ensureGeneratorIsNotConsumedWhenNotRequired) { + { + auto throwingGenerator = []() -> cppcoro::generator { + ADD_FAILURE() << "Generator was started" << std::endl; + throw std::runtime_error("Generator was started"); + co_return; + }(); + + Result result{std::move(throwingGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 0, ._offset = 0}, result); + EXPECT_NO_THROW(convertToVector(std::move(generator))); + } + + { + auto throwAfterYieldGenerator = []() -> cppcoro::generator { + IdTable idTable1 = makeIdTableFromVector({{1}}); + co_yield idTable1; + + ADD_FAILURE() << "Generator was resumed" << std::endl; + throw std::runtime_error("Generator was resumed"); + }(); + + Result result{std::move(throwAfterYieldGenerator), {}, LocalVocab{}}; + auto generator = ExportQueryExecutionTrees::getRowIndices( + LimitOffsetClause{._limit = 1, ._offset = 0}, result); + IdTable referenceTable1 = makeIdTableFromVector({{1}}); + std::vector tables; + EXPECT_NO_THROW({ tables = convertToVector(std::move(generator)); }); + EXPECT_THAT(tables, ElementsAre(Eq(std::cref(referenceTable1)))); + } +} diff --git a/test/FilterTest.cpp b/test/FilterTest.cpp new file mode 100644 index 0000000000..38b1370aa1 --- /dev/null +++ b/test/FilterTest.cpp @@ -0,0 +1,97 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#include + +#include "engine/Filter.h" +#include "engine/ValuesForTesting.h" +#include "engine/sparqlExpressions/LiteralExpression.h" +#include "util/IdTableHelpers.h" +#include "util/IndexTestHelpers.h" + +using ::testing::ElementsAre; +using ::testing::Eq; + +namespace { +// Shorthand for makeFromBool +ValueId asBool(bool value) { return Id::makeFromBool(value); } + +// Convert a generator to a vector for easier comparison in assertions +std::vector toVector(cppcoro::generator generator) { + std::vector result; + for (auto& table : generator) { + result.push_back(std::move(table)); + } + return result; +} +} // namespace + +// _____________________________________________________________________________ +TEST(Filter, verifyPredicateIsAppliedCorrectlyOnLazyEvaluation) { + QueryExecutionContext* qec = ad_utility::testing::getQec(); + qec->getQueryTreeCache().clearAll(); + std::vector idTables; + idTables.push_back(makeIdTableFromVector( + {{true}, {true}, {false}, {false}, {true}}, asBool)); + idTables.push_back(makeIdTableFromVector({{true}, {false}}, asBool)); + idTables.push_back(IdTable{1, ad_utility::makeUnlimitedAllocator()}); + idTables.push_back( + makeIdTableFromVector({{false}, {false}, {false}}, asBool)); + idTables.push_back(makeIdTableFromVector({{true}}, asBool)); + + ValuesForTesting values{qec, std::move(idTables), {Variable{"?x"}}}; + QueryExecutionTree subTree{ + qec, std::make_shared(std::move(values))}; + Filter filter{ + qec, + std::make_shared(std::move(subTree)), + {std::make_unique(Variable{"?x"}), + "Expression ?x"}}; + + auto result = filter.getResult(false, ComputationMode::LAZY_IF_SUPPORTED); + ASSERT_FALSE(result->isFullyMaterialized()); + auto& generator = result->idTables(); + + auto referenceTable1 = + makeIdTableFromVector({{true}, {true}, {true}}, asBool); + auto referenceTable2 = makeIdTableFromVector({{true}}, asBool); + IdTable referenceTable3{0, ad_utility::makeUnlimitedAllocator()}; + + EXPECT_THAT(toVector(std::move(generator)), + ElementsAre(Eq(std::cref(referenceTable1)), + Eq(std::cref(referenceTable2)), + Eq(std::cref(referenceTable3)), + Eq(std::cref(referenceTable3)), + Eq(std::cref(referenceTable2)))); +} + +// _____________________________________________________________________________ +TEST(Filter, verifyPredicateIsAppliedCorrectlyOnNonLazyEvaluation) { + QueryExecutionContext* qec = ad_utility::testing::getQec(); + qec->getQueryTreeCache().clearAll(); + std::vector idTables; + idTables.push_back(makeIdTableFromVector( + {{true}, {true}, {false}, {false}, {true}}, asBool)); + idTables.push_back(makeIdTableFromVector({{true}, {false}}, asBool)); + idTables.push_back(IdTable{1, ad_utility::makeUnlimitedAllocator()}); + idTables.push_back( + makeIdTableFromVector({{false}, {false}, {false}}, asBool)); + idTables.push_back(makeIdTableFromVector({{true}}, asBool)); + + ValuesForTesting values{qec, std::move(idTables), {Variable{"?x"}}}; + QueryExecutionTree subTree{ + qec, std::make_shared(std::move(values))}; + Filter filter{ + qec, + std::make_shared(std::move(subTree)), + {std::make_unique(Variable{"?x"}), + "Expression ?x"}}; + + auto result = filter.getResult(false, ComputationMode::FULLY_MATERIALIZED); + ASSERT_TRUE(result->isFullyMaterialized()); + + EXPECT_EQ( + result->idTable(), + makeIdTableFromVector({{true}, {true}, {true}, {true}, {true}}, asBool)); +} diff --git a/test/OperationTest.cpp b/test/OperationTest.cpp index 9e9d47571a..9cd5022f14 100644 --- a/test/OperationTest.cpp +++ b/test/OperationTest.cpp @@ -3,7 +3,6 @@ // Author: Johannes Kalmbach (joka921) #include -#include #include "engine/NeutralElementOperation.h" #include "engine/ValuesForTesting.h" @@ -13,9 +12,36 @@ using namespace ad_utility::testing; using namespace ::testing; +using ad_utility::CacheStatus; using ad_utility::CancellationException; using ad_utility::CancellationHandle; using ad_utility::CancellationState; +using Status = RuntimeInformation::Status; + +namespace { +// Helper function to perform actions at various stages of a generator +template +auto expectAtEachStageOfGenerator( + cppcoro::generator generator, + std::vector> functions, + ad_utility::source_location l = ad_utility::source_location::current()) { + auto locationTrace = generateLocationTrace(l); + size_t index = 0; + for ([[maybe_unused]] T& _ : generator) { + functions.at(index)(); + ++index; + } + EXPECT_EQ(index, functions.size()); +} + +void expectRtiHasDimensions( + RuntimeInformation& rti, uint64_t cols, uint64_t rows, + ad_utility::source_location l = ad_utility::source_location::current()) { + auto locationTrace = generateLocationTrace(l); + EXPECT_EQ(rti.numCols_, cols); + EXPECT_EQ(rti.numRows_, rows); +} +} // namespace // ________________________________________________ TEST(OperationTest, limitIsRepresentedInCacheKey) { @@ -40,7 +66,7 @@ TEST(OperationTest, getResultOnlyCached) { // The second `true` means "only read the result if it was cached". // We have just cleared the cache, and so this should return `nullptr`. EXPECT_EQ(n.getResult(true, ComputationMode::ONLY_IF_CACHED), nullptr); - EXPECT_EQ(n.runtimeInfo().status_, RuntimeInformation::Status::notStarted); + EXPECT_EQ(n.runtimeInfo().status_, Status::notStarted); // Nothing has been stored in the cache by this call. EXPECT_EQ(qec->getQueryTreeCache().numNonPinnedEntries(), 0); EXPECT_EQ(qec->getQueryTreeCache().numPinnedEntries(), 0); @@ -49,9 +75,8 @@ TEST(OperationTest, getResultOnlyCached) { NeutralElementOperation n2{qec}; auto result = n2.getResult(); EXPECT_NE(result, nullptr); - EXPECT_EQ(n2.runtimeInfo().status_, - RuntimeInformation::Status::fullyMaterialized); - EXPECT_EQ(n2.runtimeInfo().cacheStatus_, ad_utility::CacheStatus::computed); + EXPECT_EQ(n2.runtimeInfo().status_, Status::fullyMaterialized); + EXPECT_EQ(n2.runtimeInfo().cacheStatus_, CacheStatus::computed); EXPECT_EQ(qec->getQueryTreeCache().numNonPinnedEntries(), 1); EXPECT_EQ(qec->getQueryTreeCache().numPinnedEntries(), 0); @@ -59,8 +84,7 @@ TEST(OperationTest, getResultOnlyCached) { // get exactly the same `shared_ptr` as with the previous call. NeutralElementOperation n3{qec}; EXPECT_EQ(n3.getResult(true, ComputationMode::ONLY_IF_CACHED), result); - EXPECT_EQ(n3.runtimeInfo().cacheStatus_, - ad_utility::CacheStatus::cachedNotPinned); + EXPECT_EQ(n3.runtimeInfo().cacheStatus_, CacheStatus::cachedNotPinned); // We can even use the `onlyReadFromCache` case to upgrade a non-pinned // cache-entry to a pinned cache entry @@ -71,8 +95,7 @@ TEST(OperationTest, getResultOnlyCached) { // The cache status is `cachedNotPinned` because we found the element cached // but not pinned (it does reflect the status BEFORE the operation). - EXPECT_EQ(n4.runtimeInfo().cacheStatus_, - ad_utility::CacheStatus::cachedNotPinned); + EXPECT_EQ(n4.runtimeInfo().cacheStatus_, CacheStatus::cachedNotPinned); EXPECT_EQ(qec->getQueryTreeCache().numNonPinnedEntries(), 0); EXPECT_EQ(qec->getQueryTreeCache().numPinnedEntries(), 1); @@ -80,8 +103,7 @@ TEST(OperationTest, getResultOnlyCached) { // result. qecCopy._pinResult = false; EXPECT_EQ(n4.getResult(true, ComputationMode::ONLY_IF_CACHED), result); - EXPECT_EQ(n4.runtimeInfo().cacheStatus_, - ad_utility::CacheStatus::cachedPinned); + EXPECT_EQ(n4.runtimeInfo().cacheStatus_, CacheStatus::cachedPinned); // Clear the (global) cache again to not possibly interfere with other unit // tests. @@ -221,3 +243,340 @@ TEST(Operation, createRuntimInfoFromEstimates) { EXPECT_EQ(operation.runtimeInfo().details_["limit"], 12); EXPECT_EQ(operation.runtimeInfo().details_["offset"], 3); } + +// _____________________________________________________________________________ +TEST(Operation, lazilyEvaluatedOperationIsNotCached) { + using V = Variable; + auto qec = getQec(); + SparqlTripleSimple scanTriple{V{"?x"}, V{"?y"}, V{"?z"}}; + IndexScan scan{qec, Permutation::Enum::POS, scanTriple}; + + qec->getQueryTreeCache().clearAll(); + auto result = scan.getResult(true, ComputationMode::LAZY_IF_SUPPORTED); + ASSERT_NE(result, nullptr); + EXPECT_FALSE(result->isFullyMaterialized()); + + EXPECT_EQ(qec->getQueryTreeCache().numNonPinnedEntries(), 0); + EXPECT_EQ(qec->getQueryTreeCache().numPinnedEntries(), 0); +} + +// _____________________________________________________________________________ +TEST(Operation, updateRuntimeStatsWorksCorrectly) { + auto qec = getQec(); + auto idTable = makeIdTableFromVector({{3, 4}, {7, 8}, {9, 123}}); + ValuesForTesting valuesForTesting{ + qec, std::move(idTable), {Variable{"?x"}, Variable{"?y"}}}; + + auto& rti = valuesForTesting.runtimeInfo(); + + // Test operation with built-in filter + valuesForTesting.externalLimitApplied_ = false; + valuesForTesting.updateRuntimeStats(false, 11, 13, 17ms); + + EXPECT_EQ(rti.numCols_, 13); + EXPECT_EQ(rti.numRows_, 11); + EXPECT_EQ(rti.totalTime_, 17ms); + EXPECT_EQ(rti.originalTotalTime_, 17ms); + EXPECT_EQ(rti.originalOperationTime_, 17ms); + + // Test built-in filter + valuesForTesting.externalLimitApplied_ = false; + valuesForTesting.updateRuntimeStats(true, 5, 3, 7ms); + + EXPECT_EQ(rti.numCols_, 13); + EXPECT_EQ(rti.numRows_, 11); + EXPECT_EQ(rti.totalTime_, 17ms + 7ms); + EXPECT_EQ(rti.originalTotalTime_, 17ms + 7ms); + EXPECT_EQ(rti.originalOperationTime_, 17ms + 7ms); + + rti.children_ = {std::make_shared()}; + rti.numCols_ = 0; + rti.numRows_ = 0; + rti.totalTime_ = 0ms; + rti.originalOperationTime_ = 0ms; + auto& childRti = *rti.children_.at(0); + + // Test operation with external filter + valuesForTesting.externalLimitApplied_ = true; + valuesForTesting.updateRuntimeStats(false, 31, 37, 41ms); + + EXPECT_EQ(rti.numCols_, 0); + EXPECT_EQ(rti.numRows_, 0); + EXPECT_EQ(rti.totalTime_, 41ms); + EXPECT_EQ(rti.originalTotalTime_, 41ms); + EXPECT_EQ(rti.originalOperationTime_, 0ms); + + EXPECT_EQ(childRti.numCols_, 37); + EXPECT_EQ(childRti.numRows_, 31); + EXPECT_EQ(childRti.totalTime_, 41ms); + EXPECT_EQ(childRti.originalTotalTime_, 41ms); + EXPECT_EQ(childRti.originalOperationTime_, 41ms); + + // Test external filter + valuesForTesting.externalLimitApplied_ = true; + valuesForTesting.updateRuntimeStats(true, 19, 23, 29ms); + + EXPECT_EQ(rti.numCols_, 23); + EXPECT_EQ(rti.numRows_, 19); + EXPECT_EQ(rti.totalTime_, 41ms + 29ms); + EXPECT_EQ(rti.originalTotalTime_, 41ms + 29ms); + EXPECT_EQ(rti.originalOperationTime_, 29ms); + + EXPECT_EQ(childRti.numCols_, 37); + EXPECT_EQ(childRti.numRows_, 31); + EXPECT_EQ(childRti.totalTime_, 41ms); + EXPECT_EQ(childRti.originalTotalTime_, 41ms); + EXPECT_EQ(childRti.originalOperationTime_, 41ms); +} + +// _____________________________________________________________________________ +TEST(Operation, verifyRuntimeInformationIsUpdatedForLazyOperations) { + auto qec = getQec(); + std::vector idTablesVector{}; + idTablesVector.push_back(makeIdTableFromVector({{3, 4}})); + idTablesVector.push_back(makeIdTableFromVector({{7, 8}})); + ValuesForTesting valuesForTesting{ + qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}}}; + + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + EXPECT_THROW( + valuesForTesting.runComputation(timer, ComputationMode::ONLY_IF_CACHED), + ad_utility::Exception); + + auto result = valuesForTesting.runComputation( + timer, ComputationMode::LAZY_IF_SUPPORTED); + + auto& rti = valuesForTesting.runtimeInfo(); + + EXPECT_EQ(rti.status_, Status::lazilyMaterialized); + EXPECT_EQ(rti.totalTime_, 0ms); + EXPECT_EQ(rti.originalTotalTime_, 0ms); + EXPECT_EQ(rti.originalOperationTime_, 0ms); + + expectAtEachStageOfGenerator( + std::move(result.idTables()), + {[&]() { + EXPECT_EQ(rti.status_, Status::lazilyMaterialized); + expectRtiHasDimensions(rti, 2, 1); + }, + [&]() { + EXPECT_EQ(rti.status_, Status::lazilyMaterialized); + expectRtiHasDimensions(rti, 2, 2); + }}); + + EXPECT_EQ(rti.status_, Status::lazilyMaterialized); + expectRtiHasDimensions(rti, 2, 2); +} + +// _____________________________________________________________________________ +TEST(Operation, ensureFailedStatusIsSetWhenGeneratorThrowsException) { + bool signaledUpdate = false; + Index index = makeTestIndex( + "ensureFailedStatusIsSetWhenGeneratorThrowsException", std::nullopt, true, + true, true, ad_utility::MemorySize::bytes(16), false); + QueryResultCache cache{}; + QueryExecutionContext context{ + index, &cache, makeAllocator(ad_utility::MemorySize::megabytes(100)), + SortPerformanceEstimator{}, [&](std::string) { signaledUpdate = true; }}; + AlwaysFailOperation operation{&context}; + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + auto result = + operation.runComputation(timer, ComputationMode::LAZY_IF_SUPPORTED); + + EXPECT_EQ(operation.runtimeInfo().status_, Status::lazilyMaterialized); + + EXPECT_THROW(result.idTables().begin(), std::runtime_error); + + EXPECT_EQ(operation.runtimeInfo().status_, Status::failed); + EXPECT_TRUE(signaledUpdate); +} + +// _____________________________________________________________________________ +TEST(Operation, ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd) { +#ifdef _QLEVER_NO_TIMING_TESTS + GTEST_SKIP_("because _QLEVER_NO_TIMING_TESTS defined"); +#endif + uint32_t updateCallCounter = 0; + auto idTable = makeIdTableFromVector({{}}); + Index index = makeTestIndex( + "ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd", std::nullopt, true, + true, true, ad_utility::MemorySize::bytes(16), false); + QueryResultCache cache{}; + QueryExecutionContext context{ + index, &cache, makeAllocator(ad_utility::MemorySize::megabytes(100)), + SortPerformanceEstimator{}, [&](std::string) { ++updateCallCounter; }}; + CustomGeneratorOperation operation{ + &context, [](const IdTable& idTable) -> cppcoro::generator { + std::this_thread::sleep_for(50ms); + co_yield idTable.clone(); + // This one should not trigger because it's below the 50ms threshold + std::this_thread::sleep_for(30ms); + co_yield idTable.clone(); + std::this_thread::sleep_for(30ms); + co_yield idTable.clone(); + // This one should not trigger directly, but trigger because it's the + // last one + std::this_thread::sleep_for(30ms); + co_yield idTable.clone(); + }(idTable)}; + + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + auto result = + operation.runComputation(timer, ComputationMode::LAZY_IF_SUPPORTED); + + EXPECT_EQ(updateCallCounter, 1); + + expectAtEachStageOfGenerator(std::move(result.idTables()), + { + [&]() { EXPECT_EQ(updateCallCounter, 2); }, + [&]() { EXPECT_EQ(updateCallCounter, 2); }, + [&]() { EXPECT_EQ(updateCallCounter, 3); }, + [&]() { EXPECT_EQ(updateCallCounter, 3); }, + }); + + EXPECT_EQ(updateCallCounter, 4); +} + +// _____________________________________________________________________________ +TEST(Operation, ensureSignalUpdateIsCalledAtTheEndOfPartialConsumption) { + uint32_t updateCallCounter = 0; + auto idTable = makeIdTableFromVector({{}}); + Index index = makeTestIndex( + "ensureSignalUpdateIsCalledAtTheEndOfPartialConsumption", std::nullopt, + true, true, true, ad_utility::MemorySize::bytes(16), false); + QueryResultCache cache{}; + QueryExecutionContext context{ + index, &cache, makeAllocator(ad_utility::MemorySize::megabytes(100)), + SortPerformanceEstimator{}, [&](std::string) { ++updateCallCounter; }}; + CustomGeneratorOperation operation{ + &context, [](const IdTable& idTable) -> cppcoro::generator { + co_yield idTable.clone(); + co_yield idTable.clone(); + }(idTable)}; + + { + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + auto result = + operation.runComputation(timer, ComputationMode::LAZY_IF_SUPPORTED); + + EXPECT_EQ(updateCallCounter, 1); + auto& idTables = result.idTables(); + // Only consume partially + auto iterator = idTables.begin(); + ASSERT_NE(iterator, idTables.end()); + EXPECT_EQ(updateCallCounter, 1); + } + + // Destructor of result should call this function + EXPECT_EQ(updateCallCounter, 2); +} + +// _____________________________________________________________________________ +TEST(Operation, verifyLimitIsProperlyAppliedAndUpdatesRuntimeInfoCorrectly) { + auto qec = getQec(); + std::vector idTablesVector{}; + idTablesVector.push_back(makeIdTableFromVector({{3, 4}})); + idTablesVector.push_back(makeIdTableFromVector({{7, 8}, {9, 123}})); + ValuesForTesting valuesForTesting{ + qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}}}; + + valuesForTesting.setLimit({._limit = 1, ._offset = 1}); + + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + + auto result = valuesForTesting.runComputation( + timer, ComputationMode::LAZY_IF_SUPPORTED); + + auto& rti = valuesForTesting.runtimeInfo(); + auto& childRti = *rti.children_.at(0); + + expectRtiHasDimensions(rti, 0, 0); + expectRtiHasDimensions(childRti, 0, 0); + + expectAtEachStageOfGenerator(std::move(result.idTables()), + {[&]() { + expectRtiHasDimensions(rti, 2, 0); + expectRtiHasDimensions(childRti, 2, 1); + }, + [&]() { + expectRtiHasDimensions(rti, 2, 1); + expectRtiHasDimensions(childRti, 2, 3); + }}); + + expectRtiHasDimensions(rti, 2, 1); + expectRtiHasDimensions(childRti, 2, 3); +} + +// _____________________________________________________________________________ +TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) { + auto qec = getQec(); + qec->getQueryTreeCache().clearAll(); + std::vector idTablesVector{}; + idTablesVector.push_back(makeIdTableFromVector({{3, 4}})); + idTablesVector.push_back(makeIdTableFromVector({{7, 8}, {9, 123}})); + ValuesForTesting valuesForTesting{ + qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}}}; + + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + + auto cacheValue = valuesForTesting.runComputationAndPrepareForCache( + timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false); + EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test")); + + for ([[maybe_unused]] IdTable& _ : cacheValue.resultTable().idTables()) { + } + + auto aggregatedValue = qec->getQueryTreeCache().getIfContained("test"); + ASSERT_TRUE(aggregatedValue.has_value()); + + ASSERT_TRUE(aggregatedValue.value()._resultPointer); + + auto newRti = aggregatedValue.value()._resultPointer->runtimeInfo(); + auto& oldRti = valuesForTesting.runtimeInfo(); + + EXPECT_EQ(newRti.descriptor_, oldRti.descriptor_); + EXPECT_EQ(newRti.numCols_, oldRti.numCols_); + EXPECT_EQ(newRti.numRows_, oldRti.numRows_); + EXPECT_EQ(newRti.totalTime_, oldRti.totalTime_); + EXPECT_EQ(newRti.originalTotalTime_, oldRti.originalTotalTime_); + EXPECT_EQ(newRti.originalOperationTime_, oldRti.originalOperationTime_); + EXPECT_EQ(newRti.status_, Status::fullyMaterialized); + + const auto& aggregatedResult = + aggregatedValue.value()._resultPointer->resultTable(); + ASSERT_TRUE(aggregatedResult.isFullyMaterialized()); + + const auto& idTable = aggregatedResult.idTable(); + ASSERT_EQ(idTable.numColumns(), 2); + ASSERT_EQ(idTable.numRows(), 3); + + EXPECT_EQ(idTable, makeIdTableFromVector({{3, 4}, {7, 8}, {9, 123}})); +} + +// _____________________________________________________________________________ +TEST(Operation, checkLazyOperationIsNotCachedIfTooLarge) { + auto qec = getQec(); + qec->getQueryTreeCache().clearAll(); + std::vector idTablesVector{}; + idTablesVector.push_back(makeIdTableFromVector({{3, 4}})); + idTablesVector.push_back(makeIdTableFromVector({{7, 8}, {9, 123}})); + ValuesForTesting valuesForTesting{ + qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}}}; + + ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started}; + + auto originalSize = qec->getQueryTreeCache().getMaxSizeSingleEntry(); + + // Too small for storage + qec->getQueryTreeCache().setMaxSizeSingleEntry(1_B); + + auto cacheValue = valuesForTesting.runComputationAndPrepareForCache( + timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false); + EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test")); + qec->getQueryTreeCache().setMaxSizeSingleEntry(originalSize); + + for ([[maybe_unused]] IdTable& _ : cacheValue.resultTable().idTables()) { + } + + EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test")); +} diff --git a/test/ResultTest.cpp b/test/ResultTest.cpp new file mode 100644 index 0000000000..5c8d1ce1a7 --- /dev/null +++ b/test/ResultTest.cpp @@ -0,0 +1,490 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#include + +#include "engine/Result.h" +#include "util/IdTableHelpers.h" + +using namespace std::chrono_literals; +using testing::Combine; +using ::testing::HasSubstr; +using testing::Values; + +namespace { +// Helper function to generate all possible splits of an IdTable in order to +// exhaustively test generator variants. +std::vector> getAllSubSplits( + const IdTable& idTable) { + std::vector> result; + for (size_t i = 0; i < std::pow(idTable.size() - 1, 2); ++i) { + std::vector reverseIndex{}; + size_t copy = i; + for (size_t index = 0; index < idTable.size(); ++index) { + if (copy % 2 == 1) { + reverseIndex.push_back(index); + } + copy /= 2; + } + result.push_back( + [](auto split, IdTable clone) -> cppcoro::generator { + IdTable subSplit{clone.numColumns(), + ad_utility::makeUnlimitedAllocator()}; + size_t splitIndex = 0; + for (size_t i = 0; i < clone.size(); ++i) { + subSplit.push_back(clone[i]); + if (splitIndex < split.size() && split[splitIndex] == i) { + co_yield subSplit; + subSplit.clear(); + ++splitIndex; + } + } + if (subSplit.size() > 0) { + co_yield subSplit; + } + }(std::move(reverseIndex), idTable.clone())); + } + + return result; +} + +// _____________________________________________________________________________ +void consumeGenerator(cppcoro::generator& generator) { + for ([[maybe_unused]] IdTable& _ : generator) { + } +} +} // namespace + +TEST(Result, verifyIdTableThrowsWhenActuallyLazy) { + Result result1{ + []() -> cppcoro::generator { co_return; }(), {}, LocalVocab{}}; + EXPECT_FALSE(result1.isFullyMaterialized()); + EXPECT_THROW(result1.idTable(), ad_utility::Exception); + + Result result2{[]() -> cppcoro::generator { co_return; }(), + {}, + result1.getSharedLocalVocab()}; + EXPECT_FALSE(result2.isFullyMaterialized()); + EXPECT_THROW(result2.idTable(), ad_utility::Exception); +} + +// _____________________________________________________________________________ +TEST(Result, verifyIdTableThrowsOnSecondAccess) { + const Result result{ + []() -> cppcoro::generator { co_return; }(), {}, LocalVocab{}}; + // First access should work + for ([[maybe_unused]] IdTable& _ : result.idTables()) { + ADD_FAILURE() << "Generator is empty"; + } + // Now it should throw + EXPECT_THROW(result.idTables(), ad_utility::Exception); +} + +// _____________________________________________________________________________ +TEST(Result, verifyIdTablesThrowsWhenFullyMaterialized) { + Result result1{ + IdTable{ad_utility::makeUnlimitedAllocator()}, {}, LocalVocab{}}; + EXPECT_TRUE(result1.isFullyMaterialized()); + EXPECT_THROW(result1.idTables(), ad_utility::Exception); + + Result result2{IdTable{ad_utility::makeUnlimitedAllocator()}, + {}, + result1.getSharedLocalVocab()}; + EXPECT_TRUE(result2.isFullyMaterialized()); + EXPECT_THROW(result2.idTables(), ad_utility::Exception); +} + +// _____________________________________________________________________________ +using CIs = std::vector; +class ResultSortTest : public testing::TestWithParam> {}; + +TEST_P(ResultSortTest, verifyAssertSortOrderIsRespectedSucceedsWhenSorted) { + if constexpr (!ad_utility::areExpensiveChecksEnabled) { + GTEST_SKIP_("Expensive checks are disabled, skipping test."); + } + auto idTable = makeIdTableFromVector({{1, 6, 0}, {2, 5, 0}, {3, 4, 0}}); + + for (auto& generator : getAllSubSplits(idTable)) { + Result result{std::move(generator), std::get<1>(GetParam()), LocalVocab{}}; + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW(consumeGenerator(result.idTables())); + } else { + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE( + consumeGenerator(result.idTables()), + HasSubstr("compareRowsBySortColumns"), ad_utility::Exception); + } + } + + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW( + (Result{std::move(idTable), std::get<1>(GetParam()), LocalVocab{}})); + } else { + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE( + (Result{std::move(idTable), std::get<1>(GetParam()), LocalVocab{}}), + HasSubstr("compareRowsBySortColumns"), ad_utility::Exception); + } +} + +INSTANTIATE_TEST_SUITE_P(SuccessCases, ResultSortTest, + Combine(Values(true), + Values(CIs{}, CIs{0}, CIs{0, 1}, CIs{2, 0}))); + +INSTANTIATE_TEST_SUITE_P(FailureCases, ResultSortTest, + Combine(Values(false), + Values(CIs{1}, CIs{1, 0}, CIs{2, 1}))); + +// _____________________________________________________________________________ +TEST(Result, + verifyAnErrorIsThrownIfSortedByHasHigherIndicesThanTheTableHasColumns) { + auto idTable = makeIdTableFromVector({{1, 6, 0}, {2, 5, 0}, {3, 4, 0}}); + using ad_utility::Exception; + auto matcher = HasSubstr("colIndex < idTable.numColumns()"); + + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE( + (Result{idTable.clone(), {3}, LocalVocab{}}), matcher, Exception); + + for (auto& generator : getAllSubSplits(idTable)) { + Result result{std::move(generator), {3}, LocalVocab{}}; + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(consumeGenerator(result.idTables()), + matcher, Exception); + } + + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE( + (Result{idTable.clone(), {2, 1337}, LocalVocab{}}), matcher, Exception); + + for (auto& generator : getAllSubSplits(idTable)) { + Result result{std::move(generator), {2, 1337}, LocalVocab{}}; + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(consumeGenerator(result.idTables()), + matcher, Exception); + } +} + +// _____________________________________________________________________________ +TEST(Result, verifyRunOnNewChunkComputedThrowsWithFullyMaterializedResult) { + Result result{makeIdTableFromVector({{}}), {}, LocalVocab{}}; + + EXPECT_THROW( + result.runOnNewChunkComputed( + [](const IdTable&, std::chrono::microseconds) {}, [](bool) {}), + ad_utility::Exception); +} + +// _____________________________________________________________________________ +TEST(Result, verifyRunOnNewChunkComputedFiresCorrectly) { + auto idTable1 = makeIdTableFromVector({{1, 6, 0}, {2, 5, 0}}); + auto idTable2 = makeIdTableFromVector({{3, 4, 0}}); + auto idTable3 = makeIdTableFromVector({{1, 6, 0}, {2, 5, 0}, {3, 4, 0}}); + + Result result{ + [](auto& t1, auto& t2, auto& t3) -> cppcoro::generator { + std::this_thread::sleep_for(1ms); + co_yield t1; + std::this_thread::sleep_for(3ms); + co_yield t2; + std::this_thread::sleep_for(5ms); + co_yield t3; + }(idTable1, idTable2, idTable3), + {}, + LocalVocab{}}; + uint32_t callCounter = 0; + bool finishedConsuming = false; + + result.runOnNewChunkComputed( + [&](const IdTable& idTable, std::chrono::microseconds duration) { + ++callCounter; + if (callCounter == 1) { + EXPECT_EQ(&idTable1, &idTable); + EXPECT_GE(duration, 1ms); + } else if (callCounter == 2) { + EXPECT_EQ(&idTable2, &idTable); + EXPECT_GE(duration, 3ms); + } else if (callCounter == 3) { + EXPECT_EQ(&idTable3, &idTable); + EXPECT_GE(duration, 5ms); + } + }, + [&](bool error) { + EXPECT_FALSE(error); + finishedConsuming = true; + }); + + consumeGenerator(result.idTables()); + + EXPECT_EQ(callCounter, 3); + EXPECT_TRUE(finishedConsuming); +} + +// _____________________________________________________________________________ +TEST(Result, verifyRunOnNewChunkCallsFinishOnError) { + Result result{ + []() -> cppcoro::generator { + throw std::runtime_error{"verifyRunOnNewChunkCallsFinishOnError"}; + co_return; + }(), + {}, + LocalVocab{}}; + uint32_t callCounterGenerator = 0; + uint32_t callCounterFinished = 0; + + result.runOnNewChunkComputed( + [&](const IdTable&, std::chrono::microseconds) { + ++callCounterGenerator; + }, + [&](bool error) { + EXPECT_TRUE(error); + ++callCounterFinished; + }); + + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE( + consumeGenerator(result.idTables()), + HasSubstr("verifyRunOnNewChunkCallsFinishOnError"), std::runtime_error); + + EXPECT_EQ(callCounterGenerator, 0); + EXPECT_EQ(callCounterFinished, 1); +} + +// _____________________________________________________________________________ +TEST(Result, verifyRunOnNewChunkCallsFinishOnPartialConsumption) { + uint32_t callCounterGenerator = 0; + uint32_t callCounterFinished = 0; + + { + Result result{[](IdTable idTable) -> cppcoro::generator { + co_yield idTable; + }(makeIdTableFromVector({{}})), + {}, + LocalVocab{}}; + + result.runOnNewChunkComputed( + [&](const IdTable&, std::chrono::microseconds) { + ++callCounterGenerator; + }, + [&](bool error) { + EXPECT_FALSE(error); + ++callCounterFinished; + }); + + result.idTables().begin(); + } + + EXPECT_EQ(callCounterGenerator, 1); + EXPECT_EQ(callCounterFinished, 1); +} + +// _____________________________________________________________________________ +TEST(Result, verifyCacheDuringConsumptionThrowsWhenFullyMaterialized) { + Result result{makeIdTableFromVector({{}}), {}, LocalVocab{}}; + EXPECT_THROW( + result.cacheDuringConsumption( + [](const std::optional&, const IdTable&) { return true; }, + [](Result) {}), + ad_utility::Exception); +} + +// _____________________________________________________________________________ +TEST(Result, verifyCacheDuringConsumptionRespectsPassedParameters) { + auto idTable = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {3, 4}}); + + // Test positive case + for (auto& generator : getAllSubSplits(idTable)) { + Result result{std::move(generator), {0}, LocalVocab{}}; + result.cacheDuringConsumption( + [predictedSize = 0](const std::optional& aggregator, + const IdTable& newTable) mutable { + if (aggregator.has_value()) { + EXPECT_EQ(aggregator.value().numColumns(), predictedSize); + } else { + EXPECT_EQ(predictedSize, 0); + } + predictedSize += newTable.numColumns(); + return true; + }, + [&](Result aggregatedResult) { + EXPECT_TRUE(aggregatedResult.isFullyMaterialized()); + EXPECT_EQ(aggregatedResult.idTable(), idTable); + EXPECT_EQ(aggregatedResult.sortedBy(), std::vector{0}); + }); + } + + // Test negative case + for (auto& generator : getAllSubSplits(idTable)) { + uint32_t callCounter = 0; + Result result{std::move(generator), {}, LocalVocab{}}; + result.cacheDuringConsumption( + [&](const std::optional& aggregator, const IdTable&) { + EXPECT_FALSE(aggregator.has_value()); + ++callCounter; + return false; + }, + [&](Result) { ++callCounter; }); + EXPECT_EQ(callCounter, 0); + } +} + +// _____________________________________________________________________________ +TEST(Result, verifyApplyLimitOffsetDoesCorrectlyApplyLimitAndOffset) { + auto idTable = + makeIdTableFromVector({{0, 9}, {1, 8}, {2, 7}, {3, 6}, {4, 5}}); + LimitOffsetClause limitOffset{2, 2}; + { + uint32_t callCounter = 0; + Result result{idTable.clone(), {}, LocalVocab{}}; + result.applyLimitOffset( + limitOffset, [&](std::chrono::microseconds, const IdTable& innerTable) { + // NOTE: duration can't be tested here, processors are too fast + auto comparisonTable = makeIdTableFromVector({{2, 7}, {3, 6}}); + EXPECT_EQ(innerTable, comparisonTable); + ++callCounter; + }); + EXPECT_EQ(callCounter, 1); + } + + for (auto& generator : getAllSubSplits(idTable)) { + std::vector colSizes{}; + uint32_t totalRows = 0; + Result result{std::move(generator), {}, LocalVocab{}}; + result.applyLimitOffset( + limitOffset, [&](std::chrono::microseconds, const IdTable& innerTable) { + // NOTE: duration can't be tested here, processors are too fast + for (const auto& row : innerTable) { + ASSERT_EQ(row.size(), 2); + // Make sure we never get values that were supposed to be filtered + // out. + EXPECT_NE(row[0].getVocabIndex().get(), 0); + EXPECT_NE(row[0].getVocabIndex().get(), 1); + EXPECT_NE(row[0].getVocabIndex().get(), 4); + EXPECT_NE(row[1].getVocabIndex().get(), 9); + EXPECT_NE(row[1].getVocabIndex().get(), 8); + EXPECT_NE(row[1].getVocabIndex().get(), 5); + } + totalRows += innerTable.size(); + colSizes.push_back(innerTable.numColumns()); + }); + + EXPECT_EQ(totalRows, 0); + EXPECT_TRUE(colSizes.empty()); + + consumeGenerator(result.idTables()); + + EXPECT_EQ(totalRows, 2); + EXPECT_THAT(colSizes, ::testing::Each(testing::Eq(2))); + } +} + +// _____________________________________________________________________________ +TEST(Result, verifyApplyLimitOffsetHandlesZeroLimitCorrectly) { + auto idTable = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {3, 4}}); + LimitOffsetClause limitOffset{0, 1}; + { + uint32_t callCounter = 0; + Result result{idTable.clone(), {}, LocalVocab{}}; + result.applyLimitOffset( + limitOffset, [&](std::chrono::microseconds, const IdTable& innerTable) { + EXPECT_EQ(innerTable.numRows(), 0); + ++callCounter; + }); + EXPECT_EQ(callCounter, 1); + } + + for (auto& generator : getAllSubSplits(idTable)) { + uint32_t callCounter = 0; + Result result{std::move(generator), {}, LocalVocab{}}; + result.applyLimitOffset( + limitOffset, + [&](std::chrono::microseconds, const IdTable&) { ++callCounter; }); + + consumeGenerator(result.idTables()); + + EXPECT_EQ(callCounter, 0); + } +} + +// _____________________________________________________________________________ +using LIC = LimitOffsetClause; +class ResultLimitTest : public testing::TestWithParam> {}; + +TEST_P(ResultLimitTest, + verifyAssertThatLimitWasRespectedDoesNotThrowIfLimitWasRespected) { + auto idTable = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {3, 4}}); + { + Result result{idTable.clone(), {}, LocalVocab{}}; + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW( + result.assertThatLimitWasRespected(std::get<1>(GetParam()))); + } else { + EXPECT_THROW(result.assertThatLimitWasRespected(std::get<1>(GetParam())), + ad_utility::Exception); + } + } + + for (auto& generator : getAllSubSplits(idTable)) { + Result result{std::move(generator), {}, LocalVocab{}}; + result.assertThatLimitWasRespected(std::get<1>(GetParam())); + + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW(consumeGenerator(result.idTables())); + } else { + EXPECT_THROW(consumeGenerator(result.idTables()), ad_utility::Exception); + } + } +} + +INSTANTIATE_TEST_SUITE_P(SuccessCases, ResultLimitTest, + Combine(Values(true), + Values(LIC{}, LIC{4, 0}, LIC{4, 1337}, + LIC{42, 0}, LIC{42, 1337}))); + +INSTANTIATE_TEST_SUITE_P(FailureCases, ResultLimitTest, + Combine(Values(false), + Values(LIC{3, 0}, LIC{3, 1}, LIC{3, 2}))); + +// _____________________________________________________________________________ +class ResultDefinednessTest + : public testing::TestWithParam> {}; + +auto u = Id::makeUndefined(); +auto correctTable1 = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {3, 4}}); +auto correctTable2 = makeIdTableFromVector({{0, u}, {1, 6}, {2, 5}, {3, 4}}); +auto correctTable3 = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {3, u}}); +auto correctTable4 = makeIdTableFromVector({{0, u}, {1, u}, {2, u}, {3, u}}); +auto wrongTable1 = makeIdTableFromVector({{u, 7}, {1, 6}, {2, 5}, {3, 4}}); +auto wrongTable2 = makeIdTableFromVector({{u, 7}, {u, 6}, {u, 5}, {u, 4}}); +auto wrongTable3 = makeIdTableFromVector({{0, 7}, {1, 6}, {2, 5}, {u, 4}}); + +TEST_P(ResultDefinednessTest, + verifyCheckDefinednessDoesThrowIfColumnIsNotDefinedWhenClaimingItIs) { + if constexpr (!ad_utility::areExpensiveChecksEnabled) { + GTEST_SKIP_("Expensive checks are disabled, skipping test."); + } + VariableToColumnMap map{ + {Variable{"?a"}, {0, ColumnIndexAndTypeInfo::AlwaysDefined}}, + {Variable{"?b"}, {1, ColumnIndexAndTypeInfo::PossiblyUndefined}}}; + + { + Result result{std::get<1>(GetParam())->clone(), {}, LocalVocab{}}; + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW(result.checkDefinedness(map)); + } else { + EXPECT_THROW(result.checkDefinedness(map), ad_utility::Exception); + } + } + for (auto& generator : getAllSubSplits(*std::get<1>(GetParam()))) { + Result result{std::move(generator), {}, LocalVocab{}}; + result.checkDefinedness(map); + if (std::get<0>(GetParam())) { + EXPECT_NO_THROW(consumeGenerator(result.idTables())); + } else { + EXPECT_THROW(consumeGenerator(result.idTables()), ad_utility::Exception); + } + } +} + +INSTANTIATE_TEST_SUITE_P(SuccessCases, ResultDefinednessTest, + Combine(Values(true), + Values(&correctTable1, &correctTable2, + &correctTable3, &correctTable4))); + +INSTANTIATE_TEST_SUITE_P( + FailureCases, ResultDefinednessTest, + Combine(Values(false), Values(&wrongTable1, &wrongTable2, &wrongTable3))); diff --git a/test/RuntimeInformationTest.cpp b/test/RuntimeInformationTest.cpp index 9e0acc0b73..4a32c6f62f 100644 --- a/test/RuntimeInformationTest.cpp +++ b/test/RuntimeInformationTest.cpp @@ -16,22 +16,22 @@ TEST(RuntimeInformation, addLimitOffsetRow) { rti.totalTime_ = 4ms; rti.sizeEstimate_ = 34; - rti.addLimitOffsetRow(LimitOffsetClause{}, 5ms, true); + rti.addLimitOffsetRow(LimitOffsetClause{}, true); EXPECT_FALSE( rti.details_.contains("not-written-to-cache-because-child-of-limit")); EXPECT_FALSE( rti.details_.contains("executed-implicitly-during-query-export")); - rti.addLimitOffsetRow(LimitOffsetClause{}, 5ms, false); + rti.addLimitOffsetRow(LimitOffsetClause{}, false); EXPECT_FALSE( rti.details_.contains("not-written-to-cache-because-child-of-limit")); EXPECT_FALSE( rti.details_.contains("executed-implicitly-during-query-export")); - rti.addLimitOffsetRow(LimitOffsetClause{23, 4, 1}, 20ms, true); + rti.addLimitOffsetRow(LimitOffsetClause{23, 4, 1}, true); EXPECT_EQ(rti.descriptor_, "LIMIT 23 OFFSET 4"); - EXPECT_EQ(rti.totalTime_, 24ms); - EXPECT_EQ(rti.getOperationTime(), 20ms); + EXPECT_EQ(rti.totalTime_, 4ms); + EXPECT_EQ(rti.getOperationTime(), 0ms); ASSERT_EQ(rti.children_.size(), 1u); auto& child = *rti.children_.at(0); @@ -41,13 +41,13 @@ TEST(RuntimeInformation, addLimitOffsetRow) { EXPECT_TRUE(child.details_.at("not-written-to-cache-because-child-of-limit")); EXPECT_FALSE(rti.details_.at("executed-implicitly-during-query-export")); - rti.addLimitOffsetRow(LimitOffsetClause{std::nullopt, 17, 1}, 15ms, false); + rti.addLimitOffsetRow(LimitOffsetClause{std::nullopt, 17, 1}, false); EXPECT_FALSE(rti.children_.at(0)->details_.at( "not-written-to-cache-because-child-of-limit")); EXPECT_TRUE(rti.details_.at("executed-implicitly-during-query-export")); EXPECT_EQ(rti.descriptor_, "OFFSET 17"); - rti.addLimitOffsetRow(LimitOffsetClause{42, 0, 1}, 15ms, true); + rti.addLimitOffsetRow(LimitOffsetClause{42, 0, 1}, true); EXPECT_EQ(rti.descriptor_, "LIMIT 42"); } diff --git a/test/engine/IndexScanTest.cpp b/test/engine/IndexScanTest.cpp index 1f42fb24c2..de2f988e34 100644 --- a/test/engine/IndexScanTest.cpp +++ b/test/engine/IndexScanTest.cpp @@ -444,3 +444,47 @@ TEST(IndexScan, getResultSizeOfScan) { ASSERT_EQ(res.idTable().numColumns(), 0); } } + +// _____________________________________________________________________________ +TEST(IndexScan, computeResultCanBeConsumedLazily) { + using V = Variable; + auto qec = getQec("

, . .", true, false); + auto getId = makeGetId(qec->getIndex()); + auto x = getId(""); + auto p = getId("

"); + auto s1 = getId(""); + auto s2 = getId(""); + auto p2 = getId(""); + SparqlTripleSimple scanTriple{V{"?x"}, V{"?y"}, V{"?z"}}; + IndexScan scan{qec, Permutation::Enum::POS, scanTriple}; + + ProtoResult result = scan.computeResultOnlyForTesting(true); + + ASSERT_FALSE(result.isFullyMaterialized()); + + IdTable resultTable{3, ad_utility::makeUnlimitedAllocator()}; + + for (IdTable& idTable : result.idTables()) { + resultTable.insertAtEnd(idTable); + } + + EXPECT_EQ(resultTable, + makeIdTableFromVector({{p, s1, x}, {p, s2, x}, {p2, s1, x}})); +} + +// _____________________________________________________________________________ +TEST(IndexScan, computeResultReturnsEmptyGeneratorIfScanIsEmpty) { + using V = Variable; + using I = TripleComponent::Iri; + auto qec = getQec("

, . .", true, false); + SparqlTripleSimple scanTriple{V{"?x"}, I::fromIriref(""), V{"?z"}}; + IndexScan scan{qec, Permutation::Enum::POS, scanTriple}; + + ProtoResult result = scan.computeResultOnlyForTesting(true); + + ASSERT_FALSE(result.isFullyMaterialized()); + + for ([[maybe_unused]] IdTable& idTable : result.idTables()) { + ADD_FAILURE() << "Generator should be empty" << std::endl; + } +} diff --git a/test/engine/ValuesForTesting.h b/test/engine/ValuesForTesting.h index ac7e363a95..73fc703402 100644 --- a/test/engine/ValuesForTesting.h +++ b/test/engine/ValuesForTesting.h @@ -15,7 +15,7 @@ // operation. class ValuesForTesting : public Operation { private: - IdTable table_; + std::vector tables_; std::vector> variables_; bool supportsLimit_; // Those can be manually overwritten for testing using the respective getters. @@ -33,15 +33,39 @@ class ValuesForTesting : public Operation { LocalVocab localVocab = LocalVocab{}, std::optional multiplicity = std::nullopt) : Operation{ctx}, - table_{std::move(table)}, + tables_{}, variables_{std::move(variables)}, supportsLimit_{supportsLimit}, - sizeEstimate_{table_.numRows()}, - costEstimate_{table_.numRows()}, + sizeEstimate_{table.numRows()}, + costEstimate_{table.numRows()}, resultSortedColumns_{std::move(sortedColumns)}, localVocab_{std::move(localVocab)}, multiplicity_{multiplicity} { - AD_CONTRACT_CHECK(variables_.size() == table_.numColumns()); + AD_CONTRACT_CHECK(variables_.size() == table.numColumns()); + tables_.push_back(std::move(table)); + } + explicit ValuesForTesting(QueryExecutionContext* ctx, + std::vector tables, + std::vector> variables) + : Operation{ctx}, + tables_{std::move(tables)}, + variables_{std::move(variables)}, + supportsLimit_{false}, + sizeEstimate_{0}, + costEstimate_{0}, + resultSortedColumns_{}, + localVocab_{LocalVocab{}}, + multiplicity_{std::nullopt} { + AD_CONTRACT_CHECK( + std::ranges::all_of(tables_, [this](const IdTable& table) { + return variables_.size() == table.numColumns(); + })); + size_t totalRows = 0; + for (const IdTable& idTable : tables_) { + totalRows += idTable.numRows(); + } + sizeEstimate_ = totalRows; + costEstimate_ = totalRows; } // Accessors for the estimates for manual testing. @@ -49,8 +73,33 @@ class ValuesForTesting : public Operation { size_t& costEstimate() { return costEstimate_; } // ___________________________________________________________________________ - ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override { - auto table = table_.clone(); + ProtoResult computeResult(bool requestLaziness) override { + if (requestLaziness) { + // Not implemented yet + AD_CORRECTNESS_CHECK(!supportsLimit_); + std::vector clones; + clones.reserve(tables_.size()); + for (const IdTable& idTable : tables_) { + clones.push_back(idTable.clone()); + } + auto generator = [](auto idTables) -> cppcoro::generator { + for (IdTable& idTable : idTables) { + co_yield std::move(idTable); + } + }(std::move(clones)); + return {std::move(generator), resultSortedOn(), localVocab_.clone()}; + } + std::optional optionalTable; + if (tables_.size() > 1) { + IdTable aggregateTable{tables_.at(0).numColumns(), + tables_.at(0).getAllocator()}; + for (const IdTable& idTable : tables_) { + aggregateTable.insertAtEnd(idTable); + } + optionalTable = std::move(aggregateTable); + } + auto table = optionalTable.has_value() ? std::move(optionalTable).value() + : tables_.at(0).clone(); if (supportsLimit_) { table.erase(table.begin() + getLimit().upperBound(table.size()), table.end()); @@ -65,14 +114,19 @@ class ValuesForTesting : public Operation { // ___________________________________________________________________________ string getCacheKeyImpl() const override { std::stringstream str; - str << "Values for testing with " << table_.numColumns() << " columns and " - << table_.numRows() << " rows. "; - if (table_.numRows() > 1000) { + auto numRowsView = tables_ | std::views::transform(&IdTable::numRows); + auto totalNumRows = std::reduce(numRowsView.begin(), numRowsView.end(), 0); + auto numCols = tables_.empty() ? 0 : tables_.at(0).numColumns(); + str << "Values for testing with " << numCols << " columns and " + << totalNumRows << " rows. "; + if (totalNumRows > 1000) { str << ad_utility::FastRandomIntGenerator{}(); } else { - for (size_t i = 0; i < table_.numColumns(); ++i) { - for (Id entry : table_.getColumn(i)) { - str << entry << ' '; + for (const IdTable& idTable : tables_) { + for (size_t i = 0; i < idTable.numColumns(); ++i) { + for (Id entry : idTable.getColumn(i)) { + str << entry << ' '; + } } } } @@ -85,7 +139,9 @@ class ValuesForTesting : public Operation { return "explicit values for testing"; } - size_t getResultWidth() const override { return table_.numColumns(); } + size_t getResultWidth() const override { + return tables_.empty() ? 0 : tables_.at(0).numColumns(); + } vector resultSortedOn() const override { return resultSortedColumns_; @@ -107,7 +163,10 @@ class ValuesForTesting : public Operation { vector getChildren() override { return {}; } - bool knownEmptyResult() override { return table_.empty(); } + bool knownEmptyResult() override { + return std::ranges::all_of( + tables_, [](const IdTable& table) { return table.empty(); }); + } private: VariableToColumnMap computeVariableToColumnMap() const override { @@ -117,7 +176,10 @@ class ValuesForTesting : public Operation { continue; } bool containsUndef = - ad_utility::contains(table_.getColumn(i), Id::makeUndefined()); + std::ranges::any_of(tables_, [&i](const IdTable& table) { + return std::ranges::any_of(table.getColumn(i), + [](Id id) { return id.isUndefined(); }); + }); using enum ColumnIndexAndTypeInfo::UndefStatus; m[variables_.at(i).value()] = ColumnIndexAndTypeInfo{ i, containsUndef ? PossiblyUndefined : AlwaysDefined}; diff --git a/test/util/OperationTestHelpers.h b/test/util/OperationTestHelpers.h index 24826902fe..480b76536a 100644 --- a/test/util/OperationTestHelpers.h +++ b/test/util/OperationTestHelpers.h @@ -85,4 +85,61 @@ class ShallowParentOperation : public Operation { } }; +// Operation that will throw on `computeResult` for testing. +class AlwaysFailOperation : public Operation { + std::vector getChildren() override { return {}; } + string getCacheKeyImpl() const override { AD_FAIL(); } + string getDescriptor() const override { + return "AlwaysFailOperationDescriptor"; + } + size_t getResultWidth() const override { return 0; } + size_t getCostEstimate() override { return 0; } + uint64_t getSizeEstimateBeforeLimit() override { return 0; } + float getMultiplicity([[maybe_unused]] size_t) override { return 0; } + bool knownEmptyResult() override { return false; } + vector resultSortedOn() const override { return {}; } + VariableToColumnMap computeVariableToColumnMap() const override { return {}; } + + public: + using Operation::Operation; + ProtoResult computeResult(bool requestLaziness) override { + if (!requestLaziness) { + throw std::runtime_error{"AlwaysFailOperation"}; + } + return {[]() -> cppcoro::generator { + throw std::runtime_error{"AlwaysFailOperation"}; + // Required so that the exception only occurs within the generator + co_return; + }(), + resultSortedOn(), LocalVocab{}}; + } +}; + +// Lazy operation that will yield a result with a custom generator you can +// provide via the constructor. +class CustomGeneratorOperation : public Operation { + cppcoro::generator generator_; + std::vector getChildren() override { return {}; } + string getCacheKeyImpl() const override { AD_FAIL(); } + string getDescriptor() const override { + return "CustomGeneratorOperationDescriptor"; + } + size_t getResultWidth() const override { return 0; } + size_t getCostEstimate() override { return 0; } + uint64_t getSizeEstimateBeforeLimit() override { return 0; } + float getMultiplicity([[maybe_unused]] size_t) override { return 0; } + bool knownEmptyResult() override { return false; } + vector resultSortedOn() const override { return {}; } + VariableToColumnMap computeVariableToColumnMap() const override { return {}; } + + public: + CustomGeneratorOperation(QueryExecutionContext* context, + cppcoro::generator generator) + : Operation{context}, generator_{std::move(generator)} {} + ProtoResult computeResult(bool requestLaziness) override { + AD_CONTRACT_CHECK(requestLaziness); + return {std::move(generator_), resultSortedOn(), LocalVocab{}}; + } +}; + #endif // QLEVER_OPERATIONTESTHELPERS_H