From 4b1caa3a6062e444229a7abbbe4a3e9067446473 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Mon, 12 Aug 2024 20:08:41 -0700 Subject: [PATCH] Let HashProbe keep track of memory consumption when listing join results (#10652) Summary: Hash probe currently has limited memory control when extracting results from the hash table. When a small number of large sized rows from the build side is frequently joined with the left side, the total extracted size will explode, making HashProbe using a large amount of memory. And the process of filling output is not in spillable state, and will often cause OOM. This PR computes the total size when listing join results in hash probe if there are any variable size columns from the build side that is going to be extracted. It stops listing further when it reaches the maximum size. This can help to control hash probe side memory usage to a confined limit. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10652 Reviewed By: xiaoxmeng Differential Revision: D60771773 Pulled By: tanjialiang fbshipit-source-id: 2cb8c58ba795a0aa1df0485b58e4f6d0100be8f8 (cherry picked from commit 82e549265f65c72ba780855cfdada556cfeab90c) --- velox/exec/HashProbe.cpp | 52 +++++-- velox/exec/HashProbe.h | 19 +-- velox/exec/HashTable.cpp | 52 +++++-- velox/exec/HashTable.h | 61 +++++--- velox/exec/RowContainer.cpp | 4 + velox/exec/RowContainer.h | 33 +++-- .../HashJoinListResultBenchmark.cpp | 41 ++++-- velox/exec/tests/HashJoinTest.cpp | 83 ++++++++++- velox/exec/tests/HashTableTest.cpp | 110 ++++++++++++++- velox/exec/tests/RowContainerTest.cpp | 47 +++++++ velox/vector/fuzzer/VectorFuzzer.h | 132 +++++++++--------- 11 files changed, 492 insertions(+), 142 deletions(-) diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 49207f24d94b..2365c9728bd3 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -283,6 +283,30 @@ void HashProbe::maybeSetupSpillInputReader( spillPartitionSet_.erase(iter); } +void HashProbe::initializeResultIter() { + VELOX_CHECK_NOT_NULL(table_); + if (resultIter_ != nullptr) { + return; + } + std::vector listColumns; + listColumns.reserve(tableOutputProjections_.size()); + for (const auto& projection : tableOutputProjections_) { + listColumns.push_back(projection.inputChannel); + } + std::vector varSizeListColumns; + uint64_t fixedSizeListColumnsSizeSum{0}; + varSizeListColumns.reserve(tableOutputProjections_.size()); + for (const auto column : listColumns) { + if (table_->rows()->columnTypes()[column]->isFixedWidth()) { + fixedSizeListColumnsSizeSum += table_->rows()->fixedSizeAt(column); + } else { + varSizeListColumns.push_back(column); + } + } + resultIter_ = std::make_unique( + std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); +} + void HashProbe::asyncWaitForHashTable() { checkRunning(); VELOX_CHECK_NULL(table_); @@ -309,6 +333,8 @@ void HashProbe::asyncWaitForHashTable() { } table_ = std::move(hashBuildResult->table); + initializeResultIter(); + VELOX_CHECK_NOT_NULL(table_); maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId); @@ -660,7 +686,8 @@ void HashProbe::addInput(RowVectorPtr input) { lookup_->hits.resize(lookup_->rows.back() + 1); table_->joinProbe(*lookup_); } - results_.reset(*lookup_); + + resultIter_->reset(*lookup_); } void HashProbe::prepareOutput(vector_size_t size) { @@ -995,10 +1022,11 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { } } else { numOut = table_->listJoinResults( - results_, + *resultIter_, joinIncludesMissesFromLeft(joinType_), mapping, - folly::Range(outputTableRows_.data(), outputTableRows_.size())); + folly::Range(outputTableRows_.data(), outputTableRows_.size()), + operatorCtx_->driverCtx()->queryConfig().preferredOutputBatchBytes()); } // We are done processing the input batch if there are no more joined rows @@ -1024,7 +1052,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { // Right semi join only returns the build side output when the probe side // is fully complete. Do not return anything here. if (isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) { - if (results_.atEnd()) { + if (resultIter_->atEnd()) { input_ = nullptr; } return nullptr; @@ -1329,7 +1357,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } noMatchDetector_.finishIteration( - addMiss, results_.atEnd(), outputTableRows_.size() - numPassed); + addMiss, resultIter_->atEnd(), outputTableRows_.size() - numPassed); } else if (isLeftSemiFilterJoin(joinType_)) { auto addLastMatch = [&](auto row) { outputTableRows_[numPassed] = nullptr; @@ -1341,7 +1369,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { rawOutputProbeRowMapping[i], addLastMatch); } } - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiFilterJoinTracker_.finish(addLastMatch); } } else if (isLeftSemiProjectJoin(joinType_)) { @@ -1378,7 +1406,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { leftSemiProjectJoinTracker_.advance(probeRow, passed, addLast); } leftSemiProjectIsNull_.updateBounds(); - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiProjectJoinTracker_.finish(addLast); } } else { @@ -1391,7 +1419,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { leftSemiProjectJoinTracker_.advance( rawOutputProbeRowMapping[i], filterPassed(i), addLast); } - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiProjectJoinTracker_.finish(addLast); } } @@ -1416,7 +1444,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } noMatchDetector_.finishIteration( - addMiss, results_.atEnd(), outputTableRows_.size() - numPassed); + addMiss, resultIter_->atEnd(), outputTableRows_.size() - numPassed); } else { for (auto i = 0; i < numRows; ++i) { if (filterPassed(i)) { @@ -1429,7 +1457,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } void HashProbe::ensureLoadedIfNotAtEnd(column_index_t channel) { - if (results_.atEnd()) { + if (resultIter_->atEnd()) { return; } @@ -1683,7 +1711,7 @@ void HashProbe::spillOutput(const std::vector& operators) { } } - auto syncGuard = folly::makeGuard([&]() { + SCOPE_EXIT { for (auto& spillTask : spillTasks) { // We consume the result for the pending tasks. This is a cleanup in the // guard and must not throw. The first error is already captured before @@ -1693,7 +1721,7 @@ void HashProbe::spillOutput(const std::vector& operators) { } catch (const std::exception&) { } } - }); + }; for (auto& spillTask : spillTasks) { const auto result = spillTask->move(); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index a7481021a916..79709e2917f0 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -93,12 +93,15 @@ class HashProbe : public Operator { // the hash table. void asyncWaitForHashTable(); - // Sets up 'filter_' and related members.p + // Sets up 'filter_' and related members. void initializeFilter( const core::TypedExprPtr& filter, const RowTypePtr& probeType, const RowTypePtr& tableType); + // Setup 'resultIter_'. + void initializeResultIter(); + // If 'toSpillOutput', the produced output is spilled to disk for memory // arbitration. RowVectorPtr getOutputInternal(bool toSpillOutput); @@ -611,21 +614,21 @@ class HashProbe : public Operator { BaseHashTable::RowsIterator lastProbeIterator_; - /// For left and anti join with filter, tracks the probe side rows which had - /// matches on the build side but didn't pass the filter. + // For left and anti join with filter, tracks the probe side rows which had + // matches on the build side but didn't pass the filter. NoMatchDetector noMatchDetector_; - /// For left semi join filter with extra filter, de-duplicates probe side rows - /// with multiple matches. + // For left semi join filter with extra filter, de-duplicates probe side rows + // with multiple matches. LeftSemiFilterJoinTracker leftSemiFilterJoinTracker_; - /// For left semi join project with filter, de-duplicates probe side rows with - /// multiple matches. + // For left semi join project with filter, de-duplicates probe side rows with + // multiple matches. LeftSemiProjectJoinTracker leftSemiProjectJoinTracker_; // Keeps track of returned results between successive batches of // output for a batch of input. - BaseHashTable::JoinResultIterator results_; + std::unique_ptr resultIter_; RowVectorPtr output_; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 656ea091733b..94ac21d181a3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -1728,18 +1728,39 @@ void HashTable::prepareJoinTable( checkHashBitsOverlap(spillInputStartPartitionBit); } +template +inline uint64_t HashTable::joinProjectedVarColumnsSize( + const std::vector& columns, + const char* row) const { + uint64_t totalBytes{0}; + for (const auto& column : columns) { + if (!rows_->columnTypes()[column]->isFixedWidth()) { + totalBytes += rows_->variableSizeAt(row, column); + } + } + return totalBytes; +} + template int32_t HashTable::listJoinResults( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) { + folly::Range hits, + uint64_t maxBytes) { VELOX_CHECK_LE(inputRows.size(), hits.size()); - if (!hasDuplicates_) { - return listJoinResultsNoDuplicates(iter, includeMisses, inputRows, hits); + + if (iter.varSizeListColumns.empty() && !hasDuplicates_) { + // When there is no duplicates, and no variable length columns are selected + // to be projected, we are able to calculate fixed length columns total size + // directly and go through fast path. + return listJoinResultsFastPath( + iter, includeMisses, inputRows, hits, maxBytes); } + size_t numOut = 0; auto maxOut = inputRows.size(); + uint64_t totalBytes{0}; while (iter.lastRowIndex < iter.rows->size()) { auto row = (*iter.rows)[iter.lastRowIndex]; auto hit = (*iter.hits)[row]; // NOLINT @@ -1762,6 +1783,9 @@ int32_t HashTable::listJoinResults( hits[numOut] = hit; numOut++; iter.lastRowIndex++; + totalBytes += + (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + + iter.fixedSizeListColumnsSizeSum); } else { auto numRows = rows->size(); auto num = @@ -1773,12 +1797,17 @@ int32_t HashTable::listJoinResults( num * sizeof(char*)); iter.lastDuplicateRowIndex += num; numOut += num; + for (const auto* dupRow : *rows) { + totalBytes += + joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow); + } + totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); if (iter.lastDuplicateRowIndex >= numRows) { iter.lastDuplicateRowIndex = 0; iter.lastRowIndex++; } } - if (numOut >= maxOut) { + if (numOut >= maxOut || totalBytes >= maxBytes) { return numOut; } } @@ -1786,15 +1815,20 @@ int32_t HashTable::listJoinResults( } template -int32_t HashTable::listJoinResultsNoDuplicates( +int32_t HashTable::listJoinResultsFastPath( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) { + folly::Range hits, + uint64_t maxBytes) { int32_t numOut = 0; - auto maxOut = inputRows.size(); + const auto maxOut = std::min( + static_cast(inputRows.size()), + (iter.fixedSizeListColumnsSizeSum != 0 + ? maxBytes / iter.fixedSizeListColumnsSizeSum + : std::numeric_limits::max())); int32_t i = iter.lastRowIndex; - auto numRows = iter.rows->size(); + const auto numRows = iter.rows->size(); constexpr int32_t kWidth = xsimd::batch::size; auto sourceHits = reinterpret_cast(iter.hits->data()); @@ -1802,7 +1836,7 @@ int32_t HashTable::listJoinResultsNoDuplicates( // We pass the pointers as int64_t's in 'hitWords'. auto resultHits = reinterpret_cast(hits.data()); auto resultRows = inputRows.data(); - int32_t outLimit = maxOut - kWidth; + const auto outLimit = maxOut - kWidth; for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) { auto indices = simd::loadGatherIndices(sourceRows + i); auto hitWords = simd::gather(sourceHits, indices); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 2b6333f3b41a..dd0f8fb2bf37 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -136,11 +136,16 @@ class BaseHashTable { /// Returns the string of the given 'mode'. static std::string modeString(HashMode mode); - // Keeps track of results returned from a join table. One batch of - // keys can produce multiple batches of results. This is initialized - // from HashLookup, which is expected to stay constant while 'this' - // is being used. + /// Keeps track of results returned from a join table. One batch of keys can + /// produce multiple batches of results. This is initialized from HashLookup, + /// which is expected to stay constant while 'this' is being used. struct JoinResultIterator { + JoinResultIterator( + std::vector&& _varSizeListColumns, + uint64_t _fixedSizeListColumnsSizeSum) + : varSizeListColumns(std::move(_varSizeListColumns)), + fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {} + void reset(const HashLookup& lookup) { rows = &lookup.rows; hits = &lookup.hits; @@ -152,8 +157,15 @@ class BaseHashTable { return !rows || lastRowIndex == rows->size(); } + /// The indexes of the build side projected columns that are variable sized. + const std::vector varSizeListColumns; + /// The per row total bytes of the build side projected columns that are + /// fixed sized. + const uint64_t fixedSizeListColumnsSizeSum{0}; + const raw_vector* rows{nullptr}; const raw_vector* hits{nullptr}; + vector_size_t lastRowIndex{0}; vector_size_t lastDuplicateRowIndex{0}; }; @@ -230,11 +242,14 @@ class BaseHashTable { /// set to nullptr if 'includeMisses' is true. Otherwise, skips input rows /// without a match. 'includeMisses' is set to true when listing results for /// the LEFT join. + /// The filling stops when the total size of currently listed rows exceeds + /// 'maxBytes'. virtual int32_t listJoinResults( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) = 0; + folly::Range hits, + uint64_t maxBytes) = 0; /// Returns rows with 'probed' flag unset. Used by the right/full join. virtual int32_t listNotProbedRows( @@ -499,7 +514,8 @@ class HashTable : public BaseHashTable { JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) override; + folly::Range hits, + uint64_t maxBytes) override; int32_t listNotProbedRows( RowsIterator* iter, @@ -708,12 +724,14 @@ class HashTable : public BaseHashTable { void setHashMode(HashMode mode, int32_t numNew) override; - // Fast path for join results when there are no duplicates in the table. - int32_t listJoinResultsNoDuplicates( + // Fast path for join results when there are no duplicates in the table and + // only fixed size rows are to be extract. + int32_t listJoinResultsFastPath( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits); + folly::Range hits, + uint64_t maxBytes); // Tries to use as many range hashers as can in a normalized key situation. void enableRangeWhereCan( @@ -757,14 +775,14 @@ class HashTable : public BaseHashTable { raw_vector& hashes, bool initNormalizedKeys); - /// Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a - /// RowContainer owned by 'this'. 'hashes' are the hash numbers or array - /// indices (if kArray mode) for each group. Duplicate key rows are chained - /// via their next link. If not null, 'partitionInfo' provides the table - /// partition info for parallel join table build. It specifies the first and - /// (exclusive) last indexes of the insert entries in the table. If a row - /// can't be inserted within this range, it is not inserted but rather added - /// to the end of 'overflows' in 'partitionInfo'. + // Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a + // RowContainer owned by 'this'. 'hashes' are the hash numbers or array + // indices (if kArray mode) for each group. Duplicate key rows are chained + // via their next link. If not null, 'partitionInfo' provides the table + // partition info for parallel join table build. It specifies the first and + // (exclusive) last indexes of the insert entries in the table. If a row + // can't be inserted within this range, it is not inserted but rather added + // to the end of 'overflows' in 'partitionInfo'. void insertForJoin( RowContainer* rows, char** groups, @@ -841,6 +859,14 @@ class HashTable : public BaseHashTable { // Shortcut for probe with normalized keys. void joinNormalizedKeyProbe(HashLookup& lookup); + // Returns the total size of the variable size 'columns' in 'row'. + // NOTE: No checks are done in the method for performance considerations. + // Caller needs to make sure only variable size columns are inside of + // 'columns'. + inline uint64_t joinProjectedVarColumnsSize( + const std::vector& columns, + const char* row) const; + // Adds a row to a hash join table in kArray hash mode. Returns true // if a new entry was made and false if the row was added to an // existing set of rows with the same key. @@ -876,6 +902,7 @@ class HashTable : public BaseHashTable { // content. Returns true if all hashers offer a mapping to value ids // for array or normalized key. bool analyze(); + // Erases the entries of rows from the hash table and its RowContainer. // 'hashes' must be computed according to 'hashMode_'. void eraseWithHashes(folly::Range rows, uint64_t* hashes); diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 3a48108a0e1c..1fe6bbc68584 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -562,6 +562,10 @@ int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) { } } +int32_t RowContainer::fixedSizeAt(column_index_t column) { + return typeKindSize(typeKinds_[column]); +} + int32_t RowContainer::extractVariableSizeAt( const char* row, column_index_t column, diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 8353e282c800..756806a383d5 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -620,6 +620,13 @@ class RowContainer { return rowColumns_[index]; } + /// Returns the size of a string or complex types value stored in the + /// specified row and column. + int32_t variableSizeAt(const char* row, column_index_t column); + + /// Returns the per row size of a fixed size column. + int32_t fixedSizeAt(column_index_t column); + /// Bit offset of the probed flag for a full or right outer join payload. /// 0 if not applicable. int32_t probedFlagOffset() const { @@ -784,24 +791,20 @@ class RowContainer { return *reinterpret_cast(group + offset); } - /// Returns the size of a string or complex types value stored in the - /// specified row and column. - int32_t variableSizeAt(const char* row, column_index_t column); - - /// Copies a string or complex type value from the specified row and column - /// into provided buffer. Stored the size of the data in the first 4 bytes of - /// the buffer. If the value is null, writes zero into the first 4 bytes of - /// destination and returns. - /// @return The number of bytes written to 'destination' including the 4 bytes - /// of the size. + // Copies a string or complex type value from the specified row and column + // into provided buffer. Stored the size of the data in the first 4 bytes of + // the buffer. If the value is null, writes zero into the first 4 bytes of + // destination and returns. + // @return The number of bytes written to 'destination' including the 4 bytes + // of the size. int32_t extractVariableSizeAt(const char* row, column_index_t column, char* output); - /// Copies a string or complex type value from 'data' into the specified row - /// and column. Expects first 4 bytes in 'data' to contain the size of the - /// string or complex value. - /// @return The number of bytes read from 'data': 4 bytes for size + that many - /// bytes. + // Copies a string or complex type value from 'data' into the specified row + // and column. Expects first 4 bytes in 'data' to contain the size of the + // string or complex value. + // @return The number of bytes read from 'data': 4 bytes for size + that many + // bytes. int32_t storeVariableSizeAt(const char* data, char* row, column_index_t column); diff --git a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp index 0f2f9c0d76af..3a51a9c13d7d 100644 --- a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp +++ b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp @@ -403,33 +403,53 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { topTable_->joinProbe(lookup); } - // Hash probe andd list join result. + BaseHashTable::JoinResultIterator createResultIterator() { + std::vector listColumns{}; + for (int32_t i = 0; i < topTable_->rows()->columnTypes().size(); i++) { + listColumns.push_back(i); + } + std::vector varSizeListColumns; + uint64_t fixedSizeListColumnsSizeSum{0}; + varSizeListColumns.reserve(listColumns.size()); + for (const auto column : listColumns) { + if (topTable_->rows()->columnTypes()[column]->isFixedWidth()) { + fixedSizeListColumnsSizeSum += topTable_->rows()->fixedSizeAt(column); + } else { + varSizeListColumns.push_back(column); + } + } + return BaseHashTable::JoinResultIterator( + std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); + } + + // Hash probe and list join result. int64_t probeTableAndListResult() { auto lookup = std::make_unique(topTable_->hashers()); auto numBatch = params_.probeSize / params_.hashTableSize; auto batchSize = params_.hashTableSize; SelectivityInfo listJoinResultClocks; - BaseHashTable::JoinResultIterator results; BufferPtr outputRowMapping; auto outputBatchSize = batchSize; std::vector outputTableRows; int64_t sequence = 0; int64_t numJoinListResult = 0; + BaseHashTable::JoinResultIterator resultsIter = createResultIterator(); for (auto i = 0; i < numBatch; ++i) { auto batch = makeProbeVector(batchSize, params_.hashTableSize, sequence); probeTable(*lookup, batch, batchSize); - results.reset(*lookup); + resultsIter.reset(*lookup); auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); { SelectivityTimer timer(listJoinResultClocks, 0); - while (!results.atEnd()) { + while (!resultsIter.atEnd()) { numJoinListResult += topTable_->listJoinResults( - results, + resultsIter, false, mapping, - folly::Range(outputTableRows.data(), outputTableRows.size())); + folly::Range(outputTableRows.data(), outputTableRows.size()), + std::numeric_limits::max()); } } } @@ -442,22 +462,23 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto batchSize = 10000; auto mode = topTable_->hashMode(); SelectivityInfo eraseClock; - BaseHashTable::JoinResultIterator results; BufferPtr outputRowMapping; auto outputBatchSize = topTable_->rows()->numRows() + 2; std::vector outputTableRows; int64_t sequence = 0; auto batch = makeProbeVector(batchSize, batchSize, sequence); probeTable(*lookup, batch, batchSize); - results.reset(*lookup); auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); + BaseHashTable::JoinResultIterator resultIter = createResultIterator(); + resultIter.reset(*lookup); auto num = topTable_->listJoinResults( - results, + resultIter, false, mapping, - folly::Range(outputTableRows.data(), outputTableRows.size())); + folly::Range(outputTableRows.data(), outputTableRows.size()), + std::numeric_limits::max()); { SelectivityTimer timer(eraseClock, 0); topTable_->rows()->eraseRows( diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index e9f605ab4b0b..61bfa3d12959 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5433,6 +5433,87 @@ TEST_F(HashJoinTest, dynamicFilterOnPartitionKey) { .run(); } +TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { + std::vector probeVectors = + makeBatches(10, [&](int32_t /*unused*/) { + return makeRowVector( + {makeFlatVector(1'000, [](auto row) { return row % 5; })}); + }); + + // Build side has 4KB + 4B per row. + std::vector buildVectors = + makeBatches(1, [&](int32_t /*unused*/) { + return makeRowVector( + {"u_c0", "u_c1", {"u_c2"}}, + {makeFlatVector({0, 1, 2}), + makeFlatVector({ + std::string(4096, 'a'), + std::string(4096, 'b'), + std::string(4096, 'c'), + }), + makeFlatVector({ + std::string(4096, 'd'), + std::string(4096, 'e'), + std::string(4096, 'f'), + })}); + }); + + createDuckDbTable("t", {probeVectors}); + createDuckDbTable("u", {buildVectors}); + + struct TestParam { + int32_t numVarSizeColumn; + int32_t numExpectedBatches; + std::string referenceQuery; + std::string debugString() const { + return fmt::format( + "numVarSizeColumn {}, numExpectedBatches {}, referenceQuery '{}'", + numVarSizeColumn, + numExpectedBatches, + referenceQuery); + } + }; + + std::vector testParams{ + {0, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, + {1, 3000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, + {2, 6000, "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}}; + for (const auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + core::PlanNodeId joinNodeId; + std::vector outputLayout; + outputLayout.push_back("c0"); + for (int32_t i = 0; i < testParam.numVarSizeColumn; i++) { + outputLayout.push_back(fmt::format("u_c{}", i + 1)); + } + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(probeVectors) + .hashJoin( + {"c0"}, + {"u_c0"}, + PlanBuilder(planNodeIdGenerator) + .values({buildVectors}) + .planNode(), + "", + outputLayout) + .capturePlanNodeId(joinNodeId) + .planNode(); + + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(plan)) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "8192") + .injectSpill(false) + .referenceQuery(testParam.referenceQuery) + .verifier([&](const std::shared_ptr& task, bool /* unused */) { + auto planStats = toPlanStats(task->taskStats()); + auto outputBatches = planStats.at(joinNodeId).outputVectors; + ASSERT_EQ(outputBatches, testParam.numExpectedBatches); + }) + .run(); + } +} + DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); @@ -7339,7 +7420,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpill) { DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillInMiddeOfLastOutputProcessing) { std::atomic_int outputCountAfterNoMoreInout{0}; std::atomic_bool injectOnce{true}; - SCOPED_TESTVALUE_SET( + ::facebook::velox::common::testutil::ScopedTestValue abc( "facebook::velox::exec::Driver::runInternal::getOutput", std::function([&](Operator* op) { if (!isHashProbeMemoryPool(*op->pool())) { diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index 73ae6a2bacb2..343c45773b11 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -298,7 +298,7 @@ class HashTableTest : public testing::TestWithParam, return out.str(); } - void copyVectorsToTable( + std::vector copyVectorsToTable( const std::vector& batches, int32_t tableOffset, BaseHashTable* table) { @@ -311,7 +311,7 @@ class HashTableTest : public testing::TestWithParam, auto numKeys = hashers.size(); // We init a DecodedVector for each member of the RowVectors in 'batches'. std::vector> decoded; - SelectivityVector rows(batchSize); + SelectivityVector allRows(batchSize); SelectivityVector insertedRows(batchSize); for (auto& batch : batches) { // If we are only inserting a fraction of the rows, we set insertedRows to @@ -330,7 +330,7 @@ class HashTableTest : public testing::TestWithParam, VELOX_CHECK_EQ(batch->size(), batchSize); auto& decoders = decoded.back(); for (auto i = 0; i < batch->childrenSize(); ++i) { - decoders[i].decode(*batch->childAt(i), rows); + decoders[i].decode(*batch->childAt(i), allRows); if (i < numKeys) { auto hasher = table->hashers()[i].get(); hasher->decode(*batch->childAt(i), insertedRows); @@ -350,6 +350,7 @@ class HashTableTest : public testing::TestWithParam, int32_t delta = 1; const auto nextOffset = rowContainer->nextOffset(); + std::vector rows; // We insert values in a geometric skip order. 1, 2, 4, 7, // 11,... where the skip increments by one. We wrap around at the // power of two boundary. This sequence hits every place in the @@ -369,10 +370,12 @@ class HashTableTest : public testing::TestWithParam, for (auto i = 0; i < batches[batchIndex]->type()->size(); ++i) { rowContainer->store(decoded[batchIndex][i], rowIndex, newRow, i); } + rows.push_back(newRow); } position = (position + delta) & mask; ++delta; } + return rows; } // Makes a vector of 'type' with 'size' unique elements, initialized @@ -391,7 +394,8 @@ class HashTableTest : public testing::TestWithParam, auto strings = BaseVector::create>(VARCHAR(), size, pool()); for (auto row = 0; row < size; ++row) { - auto string = fmt::format("{}", keySpacing_ * (sequence + row)); + auto string = + fmt::format("{}{}", baseString_, keySpacing_ * (sequence + row)); // Make strings that overflow the inline limit for 1/10 of // the values after 10K,000. Datasets with only // range-encodable small strings can be made within the @@ -567,6 +571,8 @@ class HashTableTest : public testing::TestWithParam, // Spacing between consecutive generated keys. Affects whether // Vectorhashers make ranges or ids of distinct values. int64_t keySpacing_ = 1; + // Base string for varchar fields when making string vector. + std::string baseString_; std::unique_ptr executor_; }; @@ -796,6 +802,102 @@ TEST_P(HashTableTest, regularHashingTableSize) { } } +TEST_P(HashTableTest, listJoinResultsSize) { + baseString_ = + "If you count carefully, you will notice there are exactly 105 characters" + " in this string including space."; + const size_t kNumRows = 1024; + auto buildType = ROW( + {"f0", "v0", "v1"}, {BIGINT(), VARCHAR(), ROW({BIGINT(), VARCHAR()})}); + std::vector> keyHashers; + for (auto i = 0; i < buildType->size(); ++i) { + keyHashers.emplace_back( + std::make_unique(buildType->childAt(i), i)); + } + + auto table = HashTable::createForJoin( + std::move(keyHashers), + {BIGINT(), VARCHAR()}, + true, + false, + kNumRows, + pool()); + std::vector batches; + makeRows(kNumRows, 1, 0, buildType, batches); + auto rows = copyVectorsToTable(batches, 0, table.get()); + + std::vector inputRowsBuf; + inputRowsBuf.resize(kNumRows); + auto inputRows = + folly::Range(static_cast(inputRowsBuf.data()), kNumRows); + std::vector outputRowsBuf; + outputRowsBuf.resize(kNumRows); + auto outputRows = folly::Range(outputRowsBuf.data(), kNumRows); + + HashLookup lookup(table->hashers()); + lookup.rows.reserve(kNumRows); + lookup.hits.reserve(kNumRows); + for (auto i = 0; i < kNumRows; i++) { + lookup.rows.push_back(i); + lookup.hits.push_back(rows[i]); + } + + struct TestParam { + std::vector varSizeListColumns; + std::vector fixedSizeListColumns; + uint64_t maxBytes; + int64_t expectedRows; + + std::string debugString() const { + std::stringstream ss; + ss << "varSizeListColumns "; + ss << "["; + for (auto i = 0; i < varSizeListColumns.size(); i++) { + ss << varSizeListColumns[i]; + if (i != varSizeListColumns.size() - 1) { + ss << ", "; + } + } + ss << "] fixedSizeListColumns ["; + for (auto i = 0; i < fixedSizeListColumns.size(); i++) { + ss << fixedSizeListColumns[i]; + if (i != fixedSizeListColumns.size() - 1) { + ss << ", "; + } + } + ss << "] maxBytes " << maxBytes; + + return ss.str(); + } + }; + + // Key types: BIGINT, VARCHAR, ROW(BIGINT, VARCHAR) + // Dependent types: BIGINT, VARCHAR + std::vector testParams{ + {{}, {0}, 1024, 128}, + {{1}, {}, 2048, 20}, + {{1}, {}, 1 << 20, 1024}, + {{1}, {}, 1 << 14, 154}, + {{1}, {0}, 2048, 18}, + {{}, {0, 3}, 1024, 64}, + {{2}, {}, 2048, 17}, + {{1, 2, 4}, {0, 3}, 1 << 14, 66}}; + for (const auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + uint64_t fixedColumnSizeSum{0}; + for (const auto column : testParam.fixedSizeListColumns) { + fixedColumnSizeSum += table->rows()->fixedSizeAt(column); + } + BaseHashTable::JoinResultIterator iter( + std::vector(testParam.varSizeListColumns), + fixedColumnSizeSum); + iter.reset(lookup); + auto numRows = table->listJoinResults( + iter, true, inputRows, outputRows, testParam.maxBytes); + ASSERT_EQ(numRows, testParam.expectedRows); + } +} + TEST_P(HashTableTest, groupBySpill) { auto type = ROW({"k1"}, {BIGINT()}); testGroupBySpill(5'000'000, type, 1, 1000, 1000); diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index 7027a6560f8f..ada992e85ee9 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -1230,6 +1230,53 @@ TEST_F(RowContainerTest, rowSize) { EXPECT_EQ(rows, rowsFromContainer); } +TEST_F(RowContainerTest, columnSize) { + const uint64_t kNumRows = 1000; + auto rowContainer = + makeRowContainer({BIGINT(), VARCHAR()}, {BIGINT(), VARCHAR()}); + + VectorFuzzer fuzzer( + { + .vectorSize = kNumRows, + .stringLength = 100, + .stringVariableLength = true, + }, + pool()); + + auto rowVector = + fuzzer.fuzzInputFlatRow(ROW({BIGINT(), VARCHAR(), BIGINT(), VARCHAR()})); + + std::vector rows; + rows.reserve(kNumRows); + + ASSERT_EQ(rowContainer->numRows(), 0); + SelectivityVector allRows(kNumRows); + DecodedVector decodedKey1(*rowVector->childAt(0), allRows); + DecodedVector decodedKey2(*rowVector->childAt(1), allRows); + DecodedVector decodedDep1(*rowVector->childAt(2), allRows); + DecodedVector decodedDep2(*rowVector->childAt(3), allRows); + for (size_t i = 0; i < kNumRows; i++) { + auto row = rowContainer->newRow(); + rowContainer->store(decodedKey1, i, row, 0); + rowContainer->store(decodedKey2, i, row, 1); + rowContainer->store(decodedDep1, i, row, 2); + rowContainer->store(decodedDep2, i, row, 3); + rows.push_back(row); + } + ASSERT_EQ(rowContainer->fixedSizeAt(0), 8); + ASSERT_EQ(rowContainer->fixedSizeAt(2), 8); + const auto key2Vector = rowVector->childAt(1)->asFlatVector(); + const auto dep2Vector = rowVector->childAt(3)->asFlatVector(); + for (size_t i = 0; i < kNumRows; i++) { + ASSERT_EQ( + rowContainer->variableSizeAt(rows[i], 1), + key2Vector->valueAt(i).size()); + ASSERT_EQ( + rowContainer->variableSizeAt(rows[i], 3), + dep2Vector->valueAt(i).size()); + } +} + TEST_F(RowContainerTest, rowSizeWithNormalizedKey) { auto data = makeRowContainer({SMALLINT()}, {VARCHAR()}); data->newRow(); diff --git a/velox/vector/fuzzer/VectorFuzzer.h b/velox/vector/fuzzer/VectorFuzzer.h index 663fb5471f69..40ef82776716 100644 --- a/velox/vector/fuzzer/VectorFuzzer.h +++ b/velox/vector/fuzzer/VectorFuzzer.h @@ -162,98 +162,98 @@ class VectorFuzzer { return opts_; } - // Returns a "fuzzed" vector, containing randomized data, nulls, and indices - // vector (dictionary). Returns a vector containing `opts_.vectorSize` or - // `size` elements. + /// Returns a "fuzzed" vector, containing randomized data, nulls, and indices + /// vector (dictionary). Returns a vector containing `opts_.vectorSize` or + /// `size` elements. VectorPtr fuzz(const TypePtr& type); VectorPtr fuzz(const TypePtr& type, vector_size_t size); - // Returns a "fuzzed" vector containing randomized data customized according - // to generatorSpec. + /// Returns a "fuzzed" vector containing randomized data customized according + /// to generatorSpec. VectorPtr fuzz(const GeneratorSpec& generatorSpec); - // Same as above, but returns a vector without nulls (regardless of the value - // of opts.nullRatio). + /// Same as above, but returns a vector without nulls (regardless of the value + /// of opts.nullRatio). VectorPtr fuzzNotNull(const TypePtr& type); VectorPtr fuzzNotNull(const TypePtr& type, vector_size_t size); - // Returns a flat vector or a complex vector with flat children with - // randomized data and nulls. Returns a vector containing `opts_.vectorSize` - // or `size` elements. + /// Returns a flat vector or a complex vector with flat children with + /// randomized data and nulls. Returns a vector containing `opts_.vectorSize` + /// or `size` elements. VectorPtr fuzzFlat(const TypePtr& type); VectorPtr fuzzFlat(const TypePtr& type, vector_size_t size); - // Same as above, but returns a vector without nulls (regardless of the value - // of opts.nullRatio). + /// Same as above, but returns a vector without nulls (regardless of the value + /// of opts.nullRatio). VectorPtr fuzzFlatNotNull(const TypePtr& type); VectorPtr fuzzFlatNotNull(const TypePtr& type, vector_size_t size); - // Returns a random constant vector (which could be a null constant). Returns - // a vector with size set to `opts_.vectorSize` or 'size'. + /// Returns a random constant vector (which could be a null constant). Returns + /// a vector with size set to `opts_.vectorSize` or 'size'. VectorPtr fuzzConstant(const TypePtr& type); VectorPtr fuzzConstant(const TypePtr& type, vector_size_t size); - // Wraps `vector` using a randomized indices vector, returning a - // DictionaryVector which has same number of indices as the underlying - // `vector` size. + /// Wraps `vector` using a randomized indices vector, returning a + /// DictionaryVector which has same number of indices as the underlying + /// `vector` size. VectorPtr fuzzDictionary(const VectorPtr& vector); VectorPtr fuzzDictionary(const VectorPtr& vector, vector_size_t size); - // Uses `elements` as the internal elements vector, wrapping them into an - // ArrayVector of `size` rows. - // - // The number of elements per array row is based on the size of the - // `elements` vector and `size`, and either fixed or variable (depending on - // `opts.containerVariableLength`). + /// Uses `elements` as the internal elements vector, wrapping them into an + /// ArrayVector of `size` rows. + /// + /// The number of elements per array row is based on the size of the + /// `elements` vector and `size`, and either fixed or variable (depending on + /// `opts.containerVariableLength`). ArrayVectorPtr fuzzArray(const VectorPtr& elements, vector_size_t size); - // Uses `keys` and `values` as the internal elements vectors, wrapping them - // into a MapVector of `size` rows. - // - // The number of elements per map row is based on the size of the `keys` and - // `values` vectors and `size`, and either fixed or variable (depending on - // `opts.containerVariableLength`). - // - // If opt.normalizeMapKeys is true, keys will be normalized - duplicated key - // values for a particular element will be removed/skipped. In that case, this - // method throws if the keys vector has nulls. + /// Uses `keys` and `values` as the internal elements vectors, wrapping them + /// into a MapVector of `size` rows. + /// + /// The number of elements per map row is based on the size of the `keys` and + /// `values` vectors and `size`, and either fixed or variable (depending on + /// `opts.containerVariableLength`). + /// + /// If opt.normalizeMapKeys is true, keys will be normalized - duplicated key + /// values for a particular element will be removed/skipped. In that case, + /// this method throws if the keys vector has nulls. MapVectorPtr fuzzMap(const VectorPtr& keys, const VectorPtr& values, vector_size_t size); - // Returns a "fuzzed" row vector with randomized data and nulls. + /// Returns a "fuzzed" row vector with randomized data and nulls. RowVectorPtr fuzzRow(const RowTypePtr& rowType); - // If allowTopLevelNulls is false, the top level row wont have nulls. + /// If allowTopLevelNulls is false, the top level row wont have nulls. RowVectorPtr fuzzRow( const RowTypePtr& rowType, vector_size_t size, bool allowTopLevelNulls = true); - // Returns a RowVector based on the provided vectors, fuzzing its top-level - // null buffer. + /// Returns a RowVector based on the provided vectors, fuzzing its top-level + /// null buffer. RowVectorPtr fuzzRow( std::vector&& children, std::vector childrenNames, vector_size_t size); - // Returns a RowVector based on the provided vectors, fuzzing its top-level - // null buffer. + /// Returns a RowVector based on the provided vectors, fuzzing its top-level + /// null buffer. RowVectorPtr fuzzRow(std::vector&& children, vector_size_t size); - // Same as the function above, but never return nulls for the top-level row - // elements. + /// Same as the function above, but never return nulls for the top-level row + /// elements. RowVectorPtr fuzzInputRow(const RowTypePtr& rowType); /// Same as the function above, but all generated vectors are flat, i.e. no /// constant or dictionary-encoded vectors at any level. RowVectorPtr fuzzInputFlatRow(const RowTypePtr& rowType); - // Generates a random type, including maps, vectors, and arrays. maxDepth - // limits the maximum level of nesting for complex types. maxDepth <= 1 means - // no complex types are allowed. - // - // There are no options to control type generation yet; these may be added in - // the future. + /// Generates a random type, including maps, vectors, and arrays. maxDepth + /// limits the maximum level of nesting for complex types. maxDepth <= 1 means + /// no complex types are allowed. + /// + /// There are no options to control type generation yet; these may be added in + /// the future. TypePtr randType(int maxDepth = 5); /// Same as the function above, but only generate orderable types. @@ -266,14 +266,14 @@ class VectorFuzzer { const std::vector& scalarTypes, int maxDepth = 5); - // Generates short decimal TypePtr with random precision and scale. + /// Generates short decimal TypePtr with random precision and scale. inline TypePtr randShortDecimalType() { auto [precision, scale] = randPrecisionScale(ShortDecimalType::kMaxPrecision); return DECIMAL(precision, scale); } - // Generates long decimal TypePtr with random precision and scale. + /// Generates long decimal TypePtr with random precision and scale. inline TypePtr randLongDecimalType() { auto [precision, scale] = randPrecisionScale(LongDecimalType::kMaxPrecision); @@ -289,40 +289,40 @@ class VectorFuzzer { return boost::random::uniform_01()(rng_) < n; } - // Wraps the given vector in a LazyVector. If there are multiple dictionary - // layers then the lazy wrap is applied over the innermost dictionary layer. + /// Wraps the given vector in a LazyVector. If there are multiple dictionary + /// layers then the lazy wrap is applied over the innermost dictionary layer. static VectorPtr wrapInLazyVector(VectorPtr baseVector); - // Randomly applies wrapInLazyVector() to the children of the given input row - // vector. Must only be used for input row vectors where all children are - // non-null and non-lazy. Is useful when the input rowVector needs to be - // re-used between multiple evaluations. + /// Randomly applies wrapInLazyVector() to the children of the given input row + /// vector. Must only be used for input row vectors where all children are + /// non-null and non-lazy. Is useful when the input rowVector needs to be + /// re-used between multiple evaluations. RowVectorPtr fuzzRowChildrenToLazy(RowVectorPtr rowVector); - // Returns a copy of 'rowVector' but with the columns having indices listed in - // 'columnsToWrapInLazy' wrapped in lazy encoding. Must only be used for input - // row vectors where all children are non-null and non-lazy. - // 'columnsToWrapInLazy' can contain negative column indices that represent - // lazy vectors that should be preloaded before being fed to the evaluator. - // This list is sorted on the absolute value of the entries. + /// Returns a copy of 'rowVector' but with the columns having indices listed + /// in 'columnsToWrapInLazy' wrapped in lazy encoding. Must only be used for + /// input row vectors where all children are non-null and non-lazy. + /// 'columnsToWrapInLazy' can contain negative column indices that represent + /// lazy vectors that should be preloaded before being fed to the evaluator. + /// This list is sorted on the absolute value of the entries. static RowVectorPtr fuzzRowChildrenToLazy( RowVectorPtr rowVector, const std::vector& columnsToWrapInLazy); - // Generate a random null buffer. + /// Generate a random null buffer. BufferPtr fuzzNulls(vector_size_t size); - // Generate a random indices buffer of 'size' with maximum possible index - // pointing to (baseVectorSize-1). + /// Generate a random indices buffer of 'size' with maximum possible index + /// pointing to (baseVectorSize-1). BufferPtr fuzzIndices(vector_size_t size, vector_size_t baseVectorSize); private: // Generates a flat vector for primitive types. VectorPtr fuzzFlatPrimitive(const TypePtr& type, vector_size_t size); - /// Generates random precision in range [1, maxPrecision] + // Generates random precision in range [1, maxPrecision] // and scale in range [0, random precision generated]. - /// @param maximum precision. + // @param maximum precision. std::pair randPrecisionScale(int8_t maxPrecision); // Returns a complex vector with randomized data and nulls. The children and