diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 49207f24d94b..2365c9728bd3 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -283,6 +283,30 @@ void HashProbe::maybeSetupSpillInputReader( spillPartitionSet_.erase(iter); } +void HashProbe::initializeResultIter() { + VELOX_CHECK_NOT_NULL(table_); + if (resultIter_ != nullptr) { + return; + } + std::vector listColumns; + listColumns.reserve(tableOutputProjections_.size()); + for (const auto& projection : tableOutputProjections_) { + listColumns.push_back(projection.inputChannel); + } + std::vector varSizeListColumns; + uint64_t fixedSizeListColumnsSizeSum{0}; + varSizeListColumns.reserve(tableOutputProjections_.size()); + for (const auto column : listColumns) { + if (table_->rows()->columnTypes()[column]->isFixedWidth()) { + fixedSizeListColumnsSizeSum += table_->rows()->fixedSizeAt(column); + } else { + varSizeListColumns.push_back(column); + } + } + resultIter_ = std::make_unique( + std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); +} + void HashProbe::asyncWaitForHashTable() { checkRunning(); VELOX_CHECK_NULL(table_); @@ -309,6 +333,8 @@ void HashProbe::asyncWaitForHashTable() { } table_ = std::move(hashBuildResult->table); + initializeResultIter(); + VELOX_CHECK_NOT_NULL(table_); maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId); @@ -660,7 +686,8 @@ void HashProbe::addInput(RowVectorPtr input) { lookup_->hits.resize(lookup_->rows.back() + 1); table_->joinProbe(*lookup_); } - results_.reset(*lookup_); + + resultIter_->reset(*lookup_); } void HashProbe::prepareOutput(vector_size_t size) { @@ -995,10 +1022,11 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { } } else { numOut = table_->listJoinResults( - results_, + *resultIter_, joinIncludesMissesFromLeft(joinType_), mapping, - folly::Range(outputTableRows_.data(), outputTableRows_.size())); + folly::Range(outputTableRows_.data(), outputTableRows_.size()), + operatorCtx_->driverCtx()->queryConfig().preferredOutputBatchBytes()); } // We are done processing the input batch if there are no more joined rows @@ -1024,7 +1052,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { // Right semi join only returns the build side output when the probe side // is fully complete. Do not return anything here. if (isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) { - if (results_.atEnd()) { + if (resultIter_->atEnd()) { input_ = nullptr; } return nullptr; @@ -1329,7 +1357,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } noMatchDetector_.finishIteration( - addMiss, results_.atEnd(), outputTableRows_.size() - numPassed); + addMiss, resultIter_->atEnd(), outputTableRows_.size() - numPassed); } else if (isLeftSemiFilterJoin(joinType_)) { auto addLastMatch = [&](auto row) { outputTableRows_[numPassed] = nullptr; @@ -1341,7 +1369,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { rawOutputProbeRowMapping[i], addLastMatch); } } - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiFilterJoinTracker_.finish(addLastMatch); } } else if (isLeftSemiProjectJoin(joinType_)) { @@ -1378,7 +1406,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { leftSemiProjectJoinTracker_.advance(probeRow, passed, addLast); } leftSemiProjectIsNull_.updateBounds(); - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiProjectJoinTracker_.finish(addLast); } } else { @@ -1391,7 +1419,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { leftSemiProjectJoinTracker_.advance( rawOutputProbeRowMapping[i], filterPassed(i), addLast); } - if (results_.atEnd()) { + if (resultIter_->atEnd()) { leftSemiProjectJoinTracker_.finish(addLast); } } @@ -1416,7 +1444,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } noMatchDetector_.finishIteration( - addMiss, results_.atEnd(), outputTableRows_.size() - numPassed); + addMiss, resultIter_->atEnd(), outputTableRows_.size() - numPassed); } else { for (auto i = 0; i < numRows; ++i) { if (filterPassed(i)) { @@ -1429,7 +1457,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } void HashProbe::ensureLoadedIfNotAtEnd(column_index_t channel) { - if (results_.atEnd()) { + if (resultIter_->atEnd()) { return; } @@ -1683,7 +1711,7 @@ void HashProbe::spillOutput(const std::vector& operators) { } } - auto syncGuard = folly::makeGuard([&]() { + SCOPE_EXIT { for (auto& spillTask : spillTasks) { // We consume the result for the pending tasks. This is a cleanup in the // guard and must not throw. The first error is already captured before @@ -1693,7 +1721,7 @@ void HashProbe::spillOutput(const std::vector& operators) { } catch (const std::exception&) { } } - }); + }; for (auto& spillTask : spillTasks) { const auto result = spillTask->move(); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index a7481021a916..79709e2917f0 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -93,12 +93,15 @@ class HashProbe : public Operator { // the hash table. void asyncWaitForHashTable(); - // Sets up 'filter_' and related members.p + // Sets up 'filter_' and related members. void initializeFilter( const core::TypedExprPtr& filter, const RowTypePtr& probeType, const RowTypePtr& tableType); + // Setup 'resultIter_'. + void initializeResultIter(); + // If 'toSpillOutput', the produced output is spilled to disk for memory // arbitration. RowVectorPtr getOutputInternal(bool toSpillOutput); @@ -611,21 +614,21 @@ class HashProbe : public Operator { BaseHashTable::RowsIterator lastProbeIterator_; - /// For left and anti join with filter, tracks the probe side rows which had - /// matches on the build side but didn't pass the filter. + // For left and anti join with filter, tracks the probe side rows which had + // matches on the build side but didn't pass the filter. NoMatchDetector noMatchDetector_; - /// For left semi join filter with extra filter, de-duplicates probe side rows - /// with multiple matches. + // For left semi join filter with extra filter, de-duplicates probe side rows + // with multiple matches. LeftSemiFilterJoinTracker leftSemiFilterJoinTracker_; - /// For left semi join project with filter, de-duplicates probe side rows with - /// multiple matches. + // For left semi join project with filter, de-duplicates probe side rows with + // multiple matches. LeftSemiProjectJoinTracker leftSemiProjectJoinTracker_; // Keeps track of returned results between successive batches of // output for a batch of input. - BaseHashTable::JoinResultIterator results_; + std::unique_ptr resultIter_; RowVectorPtr output_; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 656ea091733b..94ac21d181a3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -1728,18 +1728,39 @@ void HashTable::prepareJoinTable( checkHashBitsOverlap(spillInputStartPartitionBit); } +template +inline uint64_t HashTable::joinProjectedVarColumnsSize( + const std::vector& columns, + const char* row) const { + uint64_t totalBytes{0}; + for (const auto& column : columns) { + if (!rows_->columnTypes()[column]->isFixedWidth()) { + totalBytes += rows_->variableSizeAt(row, column); + } + } + return totalBytes; +} + template int32_t HashTable::listJoinResults( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) { + folly::Range hits, + uint64_t maxBytes) { VELOX_CHECK_LE(inputRows.size(), hits.size()); - if (!hasDuplicates_) { - return listJoinResultsNoDuplicates(iter, includeMisses, inputRows, hits); + + if (iter.varSizeListColumns.empty() && !hasDuplicates_) { + // When there is no duplicates, and no variable length columns are selected + // to be projected, we are able to calculate fixed length columns total size + // directly and go through fast path. + return listJoinResultsFastPath( + iter, includeMisses, inputRows, hits, maxBytes); } + size_t numOut = 0; auto maxOut = inputRows.size(); + uint64_t totalBytes{0}; while (iter.lastRowIndex < iter.rows->size()) { auto row = (*iter.rows)[iter.lastRowIndex]; auto hit = (*iter.hits)[row]; // NOLINT @@ -1762,6 +1783,9 @@ int32_t HashTable::listJoinResults( hits[numOut] = hit; numOut++; iter.lastRowIndex++; + totalBytes += + (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + + iter.fixedSizeListColumnsSizeSum); } else { auto numRows = rows->size(); auto num = @@ -1773,12 +1797,17 @@ int32_t HashTable::listJoinResults( num * sizeof(char*)); iter.lastDuplicateRowIndex += num; numOut += num; + for (const auto* dupRow : *rows) { + totalBytes += + joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow); + } + totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); if (iter.lastDuplicateRowIndex >= numRows) { iter.lastDuplicateRowIndex = 0; iter.lastRowIndex++; } } - if (numOut >= maxOut) { + if (numOut >= maxOut || totalBytes >= maxBytes) { return numOut; } } @@ -1786,15 +1815,20 @@ int32_t HashTable::listJoinResults( } template -int32_t HashTable::listJoinResultsNoDuplicates( +int32_t HashTable::listJoinResultsFastPath( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) { + folly::Range hits, + uint64_t maxBytes) { int32_t numOut = 0; - auto maxOut = inputRows.size(); + const auto maxOut = std::min( + static_cast(inputRows.size()), + (iter.fixedSizeListColumnsSizeSum != 0 + ? maxBytes / iter.fixedSizeListColumnsSizeSum + : std::numeric_limits::max())); int32_t i = iter.lastRowIndex; - auto numRows = iter.rows->size(); + const auto numRows = iter.rows->size(); constexpr int32_t kWidth = xsimd::batch::size; auto sourceHits = reinterpret_cast(iter.hits->data()); @@ -1802,7 +1836,7 @@ int32_t HashTable::listJoinResultsNoDuplicates( // We pass the pointers as int64_t's in 'hitWords'. auto resultHits = reinterpret_cast(hits.data()); auto resultRows = inputRows.data(); - int32_t outLimit = maxOut - kWidth; + const auto outLimit = maxOut - kWidth; for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) { auto indices = simd::loadGatherIndices(sourceRows + i); auto hitWords = simd::gather(sourceHits, indices); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 2b6333f3b41a..dd0f8fb2bf37 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -136,11 +136,16 @@ class BaseHashTable { /// Returns the string of the given 'mode'. static std::string modeString(HashMode mode); - // Keeps track of results returned from a join table. One batch of - // keys can produce multiple batches of results. This is initialized - // from HashLookup, which is expected to stay constant while 'this' - // is being used. + /// Keeps track of results returned from a join table. One batch of keys can + /// produce multiple batches of results. This is initialized from HashLookup, + /// which is expected to stay constant while 'this' is being used. struct JoinResultIterator { + JoinResultIterator( + std::vector&& _varSizeListColumns, + uint64_t _fixedSizeListColumnsSizeSum) + : varSizeListColumns(std::move(_varSizeListColumns)), + fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {} + void reset(const HashLookup& lookup) { rows = &lookup.rows; hits = &lookup.hits; @@ -152,8 +157,15 @@ class BaseHashTable { return !rows || lastRowIndex == rows->size(); } + /// The indexes of the build side projected columns that are variable sized. + const std::vector varSizeListColumns; + /// The per row total bytes of the build side projected columns that are + /// fixed sized. + const uint64_t fixedSizeListColumnsSizeSum{0}; + const raw_vector* rows{nullptr}; const raw_vector* hits{nullptr}; + vector_size_t lastRowIndex{0}; vector_size_t lastDuplicateRowIndex{0}; }; @@ -230,11 +242,14 @@ class BaseHashTable { /// set to nullptr if 'includeMisses' is true. Otherwise, skips input rows /// without a match. 'includeMisses' is set to true when listing results for /// the LEFT join. + /// The filling stops when the total size of currently listed rows exceeds + /// 'maxBytes'. virtual int32_t listJoinResults( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) = 0; + folly::Range hits, + uint64_t maxBytes) = 0; /// Returns rows with 'probed' flag unset. Used by the right/full join. virtual int32_t listNotProbedRows( @@ -499,7 +514,8 @@ class HashTable : public BaseHashTable { JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits) override; + folly::Range hits, + uint64_t maxBytes) override; int32_t listNotProbedRows( RowsIterator* iter, @@ -708,12 +724,14 @@ class HashTable : public BaseHashTable { void setHashMode(HashMode mode, int32_t numNew) override; - // Fast path for join results when there are no duplicates in the table. - int32_t listJoinResultsNoDuplicates( + // Fast path for join results when there are no duplicates in the table and + // only fixed size rows are to be extract. + int32_t listJoinResultsFastPath( JoinResultIterator& iter, bool includeMisses, folly::Range inputRows, - folly::Range hits); + folly::Range hits, + uint64_t maxBytes); // Tries to use as many range hashers as can in a normalized key situation. void enableRangeWhereCan( @@ -757,14 +775,14 @@ class HashTable : public BaseHashTable { raw_vector& hashes, bool initNormalizedKeys); - /// Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a - /// RowContainer owned by 'this'. 'hashes' are the hash numbers or array - /// indices (if kArray mode) for each group. Duplicate key rows are chained - /// via their next link. If not null, 'partitionInfo' provides the table - /// partition info for parallel join table build. It specifies the first and - /// (exclusive) last indexes of the insert entries in the table. If a row - /// can't be inserted within this range, it is not inserted but rather added - /// to the end of 'overflows' in 'partitionInfo'. + // Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a + // RowContainer owned by 'this'. 'hashes' are the hash numbers or array + // indices (if kArray mode) for each group. Duplicate key rows are chained + // via their next link. If not null, 'partitionInfo' provides the table + // partition info for parallel join table build. It specifies the first and + // (exclusive) last indexes of the insert entries in the table. If a row + // can't be inserted within this range, it is not inserted but rather added + // to the end of 'overflows' in 'partitionInfo'. void insertForJoin( RowContainer* rows, char** groups, @@ -841,6 +859,14 @@ class HashTable : public BaseHashTable { // Shortcut for probe with normalized keys. void joinNormalizedKeyProbe(HashLookup& lookup); + // Returns the total size of the variable size 'columns' in 'row'. + // NOTE: No checks are done in the method for performance considerations. + // Caller needs to make sure only variable size columns are inside of + // 'columns'. + inline uint64_t joinProjectedVarColumnsSize( + const std::vector& columns, + const char* row) const; + // Adds a row to a hash join table in kArray hash mode. Returns true // if a new entry was made and false if the row was added to an // existing set of rows with the same key. @@ -876,6 +902,7 @@ class HashTable : public BaseHashTable { // content. Returns true if all hashers offer a mapping to value ids // for array or normalized key. bool analyze(); + // Erases the entries of rows from the hash table and its RowContainer. // 'hashes' must be computed according to 'hashMode_'. void eraseWithHashes(folly::Range rows, uint64_t* hashes); diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 3a48108a0e1c..1fe6bbc68584 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -562,6 +562,10 @@ int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) { } } +int32_t RowContainer::fixedSizeAt(column_index_t column) { + return typeKindSize(typeKinds_[column]); +} + int32_t RowContainer::extractVariableSizeAt( const char* row, column_index_t column, diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 8353e282c800..756806a383d5 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -620,6 +620,13 @@ class RowContainer { return rowColumns_[index]; } + /// Returns the size of a string or complex types value stored in the + /// specified row and column. + int32_t variableSizeAt(const char* row, column_index_t column); + + /// Returns the per row size of a fixed size column. + int32_t fixedSizeAt(column_index_t column); + /// Bit offset of the probed flag for a full or right outer join payload. /// 0 if not applicable. int32_t probedFlagOffset() const { @@ -784,24 +791,20 @@ class RowContainer { return *reinterpret_cast(group + offset); } - /// Returns the size of a string or complex types value stored in the - /// specified row and column. - int32_t variableSizeAt(const char* row, column_index_t column); - - /// Copies a string or complex type value from the specified row and column - /// into provided buffer. Stored the size of the data in the first 4 bytes of - /// the buffer. If the value is null, writes zero into the first 4 bytes of - /// destination and returns. - /// @return The number of bytes written to 'destination' including the 4 bytes - /// of the size. + // Copies a string or complex type value from the specified row and column + // into provided buffer. Stored the size of the data in the first 4 bytes of + // the buffer. If the value is null, writes zero into the first 4 bytes of + // destination and returns. + // @return The number of bytes written to 'destination' including the 4 bytes + // of the size. int32_t extractVariableSizeAt(const char* row, column_index_t column, char* output); - /// Copies a string or complex type value from 'data' into the specified row - /// and column. Expects first 4 bytes in 'data' to contain the size of the - /// string or complex value. - /// @return The number of bytes read from 'data': 4 bytes for size + that many - /// bytes. + // Copies a string or complex type value from 'data' into the specified row + // and column. Expects first 4 bytes in 'data' to contain the size of the + // string or complex value. + // @return The number of bytes read from 'data': 4 bytes for size + that many + // bytes. int32_t storeVariableSizeAt(const char* data, char* row, column_index_t column); diff --git a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp index 0f2f9c0d76af..3a51a9c13d7d 100644 --- a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp +++ b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp @@ -403,33 +403,53 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { topTable_->joinProbe(lookup); } - // Hash probe andd list join result. + BaseHashTable::JoinResultIterator createResultIterator() { + std::vector listColumns{}; + for (int32_t i = 0; i < topTable_->rows()->columnTypes().size(); i++) { + listColumns.push_back(i); + } + std::vector varSizeListColumns; + uint64_t fixedSizeListColumnsSizeSum{0}; + varSizeListColumns.reserve(listColumns.size()); + for (const auto column : listColumns) { + if (topTable_->rows()->columnTypes()[column]->isFixedWidth()) { + fixedSizeListColumnsSizeSum += topTable_->rows()->fixedSizeAt(column); + } else { + varSizeListColumns.push_back(column); + } + } + return BaseHashTable::JoinResultIterator( + std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); + } + + // Hash probe and list join result. int64_t probeTableAndListResult() { auto lookup = std::make_unique(topTable_->hashers()); auto numBatch = params_.probeSize / params_.hashTableSize; auto batchSize = params_.hashTableSize; SelectivityInfo listJoinResultClocks; - BaseHashTable::JoinResultIterator results; BufferPtr outputRowMapping; auto outputBatchSize = batchSize; std::vector outputTableRows; int64_t sequence = 0; int64_t numJoinListResult = 0; + BaseHashTable::JoinResultIterator resultsIter = createResultIterator(); for (auto i = 0; i < numBatch; ++i) { auto batch = makeProbeVector(batchSize, params_.hashTableSize, sequence); probeTable(*lookup, batch, batchSize); - results.reset(*lookup); + resultsIter.reset(*lookup); auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); { SelectivityTimer timer(listJoinResultClocks, 0); - while (!results.atEnd()) { + while (!resultsIter.atEnd()) { numJoinListResult += topTable_->listJoinResults( - results, + resultsIter, false, mapping, - folly::Range(outputTableRows.data(), outputTableRows.size())); + folly::Range(outputTableRows.data(), outputTableRows.size()), + std::numeric_limits::max()); } } } @@ -442,22 +462,23 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { auto batchSize = 10000; auto mode = topTable_->hashMode(); SelectivityInfo eraseClock; - BaseHashTable::JoinResultIterator results; BufferPtr outputRowMapping; auto outputBatchSize = topTable_->rows()->numRows() + 2; std::vector outputTableRows; int64_t sequence = 0; auto batch = makeProbeVector(batchSize, batchSize, sequence); probeTable(*lookup, batch, batchSize); - results.reset(*lookup); auto mapping = initializeRowNumberMapping( outputRowMapping, outputBatchSize, pool_.get()); outputTableRows.resize(outputBatchSize); + BaseHashTable::JoinResultIterator resultIter = createResultIterator(); + resultIter.reset(*lookup); auto num = topTable_->listJoinResults( - results, + resultIter, false, mapping, - folly::Range(outputTableRows.data(), outputTableRows.size())); + folly::Range(outputTableRows.data(), outputTableRows.size()), + std::numeric_limits::max()); { SelectivityTimer timer(eraseClock, 0); topTable_->rows()->eraseRows( diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index e9f605ab4b0b..61bfa3d12959 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5433,6 +5433,87 @@ TEST_F(HashJoinTest, dynamicFilterOnPartitionKey) { .run(); } +TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { + std::vector probeVectors = + makeBatches(10, [&](int32_t /*unused*/) { + return makeRowVector( + {makeFlatVector(1'000, [](auto row) { return row % 5; })}); + }); + + // Build side has 4KB + 4B per row. + std::vector buildVectors = + makeBatches(1, [&](int32_t /*unused*/) { + return makeRowVector( + {"u_c0", "u_c1", {"u_c2"}}, + {makeFlatVector({0, 1, 2}), + makeFlatVector({ + std::string(4096, 'a'), + std::string(4096, 'b'), + std::string(4096, 'c'), + }), + makeFlatVector({ + std::string(4096, 'd'), + std::string(4096, 'e'), + std::string(4096, 'f'), + })}); + }); + + createDuckDbTable("t", {probeVectors}); + createDuckDbTable("u", {buildVectors}); + + struct TestParam { + int32_t numVarSizeColumn; + int32_t numExpectedBatches; + std::string referenceQuery; + std::string debugString() const { + return fmt::format( + "numVarSizeColumn {}, numExpectedBatches {}, referenceQuery '{}'", + numVarSizeColumn, + numExpectedBatches, + referenceQuery); + } + }; + + std::vector testParams{ + {0, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, + {1, 3000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, + {2, 6000, "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}}; + for (const auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + core::PlanNodeId joinNodeId; + std::vector outputLayout; + outputLayout.push_back("c0"); + for (int32_t i = 0; i < testParam.numVarSizeColumn; i++) { + outputLayout.push_back(fmt::format("u_c{}", i + 1)); + } + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(probeVectors) + .hashJoin( + {"c0"}, + {"u_c0"}, + PlanBuilder(planNodeIdGenerator) + .values({buildVectors}) + .planNode(), + "", + outputLayout) + .capturePlanNodeId(joinNodeId) + .planNode(); + + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(plan)) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "8192") + .injectSpill(false) + .referenceQuery(testParam.referenceQuery) + .verifier([&](const std::shared_ptr& task, bool /* unused */) { + auto planStats = toPlanStats(task->taskStats()); + auto outputBatches = planStats.at(joinNodeId).outputVectors; + ASSERT_EQ(outputBatches, testParam.numExpectedBatches); + }) + .run(); + } +} + DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); @@ -7339,7 +7420,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpill) { DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillInMiddeOfLastOutputProcessing) { std::atomic_int outputCountAfterNoMoreInout{0}; std::atomic_bool injectOnce{true}; - SCOPED_TESTVALUE_SET( + ::facebook::velox::common::testutil::ScopedTestValue abc( "facebook::velox::exec::Driver::runInternal::getOutput", std::function([&](Operator* op) { if (!isHashProbeMemoryPool(*op->pool())) { diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index 73ae6a2bacb2..343c45773b11 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -298,7 +298,7 @@ class HashTableTest : public testing::TestWithParam, return out.str(); } - void copyVectorsToTable( + std::vector copyVectorsToTable( const std::vector& batches, int32_t tableOffset, BaseHashTable* table) { @@ -311,7 +311,7 @@ class HashTableTest : public testing::TestWithParam, auto numKeys = hashers.size(); // We init a DecodedVector for each member of the RowVectors in 'batches'. std::vector> decoded; - SelectivityVector rows(batchSize); + SelectivityVector allRows(batchSize); SelectivityVector insertedRows(batchSize); for (auto& batch : batches) { // If we are only inserting a fraction of the rows, we set insertedRows to @@ -330,7 +330,7 @@ class HashTableTest : public testing::TestWithParam, VELOX_CHECK_EQ(batch->size(), batchSize); auto& decoders = decoded.back(); for (auto i = 0; i < batch->childrenSize(); ++i) { - decoders[i].decode(*batch->childAt(i), rows); + decoders[i].decode(*batch->childAt(i), allRows); if (i < numKeys) { auto hasher = table->hashers()[i].get(); hasher->decode(*batch->childAt(i), insertedRows); @@ -350,6 +350,7 @@ class HashTableTest : public testing::TestWithParam, int32_t delta = 1; const auto nextOffset = rowContainer->nextOffset(); + std::vector rows; // We insert values in a geometric skip order. 1, 2, 4, 7, // 11,... where the skip increments by one. We wrap around at the // power of two boundary. This sequence hits every place in the @@ -369,10 +370,12 @@ class HashTableTest : public testing::TestWithParam, for (auto i = 0; i < batches[batchIndex]->type()->size(); ++i) { rowContainer->store(decoded[batchIndex][i], rowIndex, newRow, i); } + rows.push_back(newRow); } position = (position + delta) & mask; ++delta; } + return rows; } // Makes a vector of 'type' with 'size' unique elements, initialized @@ -391,7 +394,8 @@ class HashTableTest : public testing::TestWithParam, auto strings = BaseVector::create>(VARCHAR(), size, pool()); for (auto row = 0; row < size; ++row) { - auto string = fmt::format("{}", keySpacing_ * (sequence + row)); + auto string = + fmt::format("{}{}", baseString_, keySpacing_ * (sequence + row)); // Make strings that overflow the inline limit for 1/10 of // the values after 10K,000. Datasets with only // range-encodable small strings can be made within the @@ -567,6 +571,8 @@ class HashTableTest : public testing::TestWithParam, // Spacing between consecutive generated keys. Affects whether // Vectorhashers make ranges or ids of distinct values. int64_t keySpacing_ = 1; + // Base string for varchar fields when making string vector. + std::string baseString_; std::unique_ptr executor_; }; @@ -796,6 +802,102 @@ TEST_P(HashTableTest, regularHashingTableSize) { } } +TEST_P(HashTableTest, listJoinResultsSize) { + baseString_ = + "If you count carefully, you will notice there are exactly 105 characters" + " in this string including space."; + const size_t kNumRows = 1024; + auto buildType = ROW( + {"f0", "v0", "v1"}, {BIGINT(), VARCHAR(), ROW({BIGINT(), VARCHAR()})}); + std::vector> keyHashers; + for (auto i = 0; i < buildType->size(); ++i) { + keyHashers.emplace_back( + std::make_unique(buildType->childAt(i), i)); + } + + auto table = HashTable::createForJoin( + std::move(keyHashers), + {BIGINT(), VARCHAR()}, + true, + false, + kNumRows, + pool()); + std::vector batches; + makeRows(kNumRows, 1, 0, buildType, batches); + auto rows = copyVectorsToTable(batches, 0, table.get()); + + std::vector inputRowsBuf; + inputRowsBuf.resize(kNumRows); + auto inputRows = + folly::Range(static_cast(inputRowsBuf.data()), kNumRows); + std::vector outputRowsBuf; + outputRowsBuf.resize(kNumRows); + auto outputRows = folly::Range(outputRowsBuf.data(), kNumRows); + + HashLookup lookup(table->hashers()); + lookup.rows.reserve(kNumRows); + lookup.hits.reserve(kNumRows); + for (auto i = 0; i < kNumRows; i++) { + lookup.rows.push_back(i); + lookup.hits.push_back(rows[i]); + } + + struct TestParam { + std::vector varSizeListColumns; + std::vector fixedSizeListColumns; + uint64_t maxBytes; + int64_t expectedRows; + + std::string debugString() const { + std::stringstream ss; + ss << "varSizeListColumns "; + ss << "["; + for (auto i = 0; i < varSizeListColumns.size(); i++) { + ss << varSizeListColumns[i]; + if (i != varSizeListColumns.size() - 1) { + ss << ", "; + } + } + ss << "] fixedSizeListColumns ["; + for (auto i = 0; i < fixedSizeListColumns.size(); i++) { + ss << fixedSizeListColumns[i]; + if (i != fixedSizeListColumns.size() - 1) { + ss << ", "; + } + } + ss << "] maxBytes " << maxBytes; + + return ss.str(); + } + }; + + // Key types: BIGINT, VARCHAR, ROW(BIGINT, VARCHAR) + // Dependent types: BIGINT, VARCHAR + std::vector testParams{ + {{}, {0}, 1024, 128}, + {{1}, {}, 2048, 20}, + {{1}, {}, 1 << 20, 1024}, + {{1}, {}, 1 << 14, 154}, + {{1}, {0}, 2048, 18}, + {{}, {0, 3}, 1024, 64}, + {{2}, {}, 2048, 17}, + {{1, 2, 4}, {0, 3}, 1 << 14, 66}}; + for (const auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + uint64_t fixedColumnSizeSum{0}; + for (const auto column : testParam.fixedSizeListColumns) { + fixedColumnSizeSum += table->rows()->fixedSizeAt(column); + } + BaseHashTable::JoinResultIterator iter( + std::vector(testParam.varSizeListColumns), + fixedColumnSizeSum); + iter.reset(lookup); + auto numRows = table->listJoinResults( + iter, true, inputRows, outputRows, testParam.maxBytes); + ASSERT_EQ(numRows, testParam.expectedRows); + } +} + TEST_P(HashTableTest, groupBySpill) { auto type = ROW({"k1"}, {BIGINT()}); testGroupBySpill(5'000'000, type, 1, 1000, 1000); diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index 7027a6560f8f..ada992e85ee9 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -1230,6 +1230,53 @@ TEST_F(RowContainerTest, rowSize) { EXPECT_EQ(rows, rowsFromContainer); } +TEST_F(RowContainerTest, columnSize) { + const uint64_t kNumRows = 1000; + auto rowContainer = + makeRowContainer({BIGINT(), VARCHAR()}, {BIGINT(), VARCHAR()}); + + VectorFuzzer fuzzer( + { + .vectorSize = kNumRows, + .stringLength = 100, + .stringVariableLength = true, + }, + pool()); + + auto rowVector = + fuzzer.fuzzInputFlatRow(ROW({BIGINT(), VARCHAR(), BIGINT(), VARCHAR()})); + + std::vector rows; + rows.reserve(kNumRows); + + ASSERT_EQ(rowContainer->numRows(), 0); + SelectivityVector allRows(kNumRows); + DecodedVector decodedKey1(*rowVector->childAt(0), allRows); + DecodedVector decodedKey2(*rowVector->childAt(1), allRows); + DecodedVector decodedDep1(*rowVector->childAt(2), allRows); + DecodedVector decodedDep2(*rowVector->childAt(3), allRows); + for (size_t i = 0; i < kNumRows; i++) { + auto row = rowContainer->newRow(); + rowContainer->store(decodedKey1, i, row, 0); + rowContainer->store(decodedKey2, i, row, 1); + rowContainer->store(decodedDep1, i, row, 2); + rowContainer->store(decodedDep2, i, row, 3); + rows.push_back(row); + } + ASSERT_EQ(rowContainer->fixedSizeAt(0), 8); + ASSERT_EQ(rowContainer->fixedSizeAt(2), 8); + const auto key2Vector = rowVector->childAt(1)->asFlatVector(); + const auto dep2Vector = rowVector->childAt(3)->asFlatVector(); + for (size_t i = 0; i < kNumRows; i++) { + ASSERT_EQ( + rowContainer->variableSizeAt(rows[i], 1), + key2Vector->valueAt(i).size()); + ASSERT_EQ( + rowContainer->variableSizeAt(rows[i], 3), + dep2Vector->valueAt(i).size()); + } +} + TEST_F(RowContainerTest, rowSizeWithNormalizedKey) { auto data = makeRowContainer({SMALLINT()}, {VARCHAR()}); data->newRow(); diff --git a/velox/vector/fuzzer/VectorFuzzer.h b/velox/vector/fuzzer/VectorFuzzer.h index 663fb5471f69..40ef82776716 100644 --- a/velox/vector/fuzzer/VectorFuzzer.h +++ b/velox/vector/fuzzer/VectorFuzzer.h @@ -162,98 +162,98 @@ class VectorFuzzer { return opts_; } - // Returns a "fuzzed" vector, containing randomized data, nulls, and indices - // vector (dictionary). Returns a vector containing `opts_.vectorSize` or - // `size` elements. + /// Returns a "fuzzed" vector, containing randomized data, nulls, and indices + /// vector (dictionary). Returns a vector containing `opts_.vectorSize` or + /// `size` elements. VectorPtr fuzz(const TypePtr& type); VectorPtr fuzz(const TypePtr& type, vector_size_t size); - // Returns a "fuzzed" vector containing randomized data customized according - // to generatorSpec. + /// Returns a "fuzzed" vector containing randomized data customized according + /// to generatorSpec. VectorPtr fuzz(const GeneratorSpec& generatorSpec); - // Same as above, but returns a vector without nulls (regardless of the value - // of opts.nullRatio). + /// Same as above, but returns a vector without nulls (regardless of the value + /// of opts.nullRatio). VectorPtr fuzzNotNull(const TypePtr& type); VectorPtr fuzzNotNull(const TypePtr& type, vector_size_t size); - // Returns a flat vector or a complex vector with flat children with - // randomized data and nulls. Returns a vector containing `opts_.vectorSize` - // or `size` elements. + /// Returns a flat vector or a complex vector with flat children with + /// randomized data and nulls. Returns a vector containing `opts_.vectorSize` + /// or `size` elements. VectorPtr fuzzFlat(const TypePtr& type); VectorPtr fuzzFlat(const TypePtr& type, vector_size_t size); - // Same as above, but returns a vector without nulls (regardless of the value - // of opts.nullRatio). + /// Same as above, but returns a vector without nulls (regardless of the value + /// of opts.nullRatio). VectorPtr fuzzFlatNotNull(const TypePtr& type); VectorPtr fuzzFlatNotNull(const TypePtr& type, vector_size_t size); - // Returns a random constant vector (which could be a null constant). Returns - // a vector with size set to `opts_.vectorSize` or 'size'. + /// Returns a random constant vector (which could be a null constant). Returns + /// a vector with size set to `opts_.vectorSize` or 'size'. VectorPtr fuzzConstant(const TypePtr& type); VectorPtr fuzzConstant(const TypePtr& type, vector_size_t size); - // Wraps `vector` using a randomized indices vector, returning a - // DictionaryVector which has same number of indices as the underlying - // `vector` size. + /// Wraps `vector` using a randomized indices vector, returning a + /// DictionaryVector which has same number of indices as the underlying + /// `vector` size. VectorPtr fuzzDictionary(const VectorPtr& vector); VectorPtr fuzzDictionary(const VectorPtr& vector, vector_size_t size); - // Uses `elements` as the internal elements vector, wrapping them into an - // ArrayVector of `size` rows. - // - // The number of elements per array row is based on the size of the - // `elements` vector and `size`, and either fixed or variable (depending on - // `opts.containerVariableLength`). + /// Uses `elements` as the internal elements vector, wrapping them into an + /// ArrayVector of `size` rows. + /// + /// The number of elements per array row is based on the size of the + /// `elements` vector and `size`, and either fixed or variable (depending on + /// `opts.containerVariableLength`). ArrayVectorPtr fuzzArray(const VectorPtr& elements, vector_size_t size); - // Uses `keys` and `values` as the internal elements vectors, wrapping them - // into a MapVector of `size` rows. - // - // The number of elements per map row is based on the size of the `keys` and - // `values` vectors and `size`, and either fixed or variable (depending on - // `opts.containerVariableLength`). - // - // If opt.normalizeMapKeys is true, keys will be normalized - duplicated key - // values for a particular element will be removed/skipped. In that case, this - // method throws if the keys vector has nulls. + /// Uses `keys` and `values` as the internal elements vectors, wrapping them + /// into a MapVector of `size` rows. + /// + /// The number of elements per map row is based on the size of the `keys` and + /// `values` vectors and `size`, and either fixed or variable (depending on + /// `opts.containerVariableLength`). + /// + /// If opt.normalizeMapKeys is true, keys will be normalized - duplicated key + /// values for a particular element will be removed/skipped. In that case, + /// this method throws if the keys vector has nulls. MapVectorPtr fuzzMap(const VectorPtr& keys, const VectorPtr& values, vector_size_t size); - // Returns a "fuzzed" row vector with randomized data and nulls. + /// Returns a "fuzzed" row vector with randomized data and nulls. RowVectorPtr fuzzRow(const RowTypePtr& rowType); - // If allowTopLevelNulls is false, the top level row wont have nulls. + /// If allowTopLevelNulls is false, the top level row wont have nulls. RowVectorPtr fuzzRow( const RowTypePtr& rowType, vector_size_t size, bool allowTopLevelNulls = true); - // Returns a RowVector based on the provided vectors, fuzzing its top-level - // null buffer. + /// Returns a RowVector based on the provided vectors, fuzzing its top-level + /// null buffer. RowVectorPtr fuzzRow( std::vector&& children, std::vector childrenNames, vector_size_t size); - // Returns a RowVector based on the provided vectors, fuzzing its top-level - // null buffer. + /// Returns a RowVector based on the provided vectors, fuzzing its top-level + /// null buffer. RowVectorPtr fuzzRow(std::vector&& children, vector_size_t size); - // Same as the function above, but never return nulls for the top-level row - // elements. + /// Same as the function above, but never return nulls for the top-level row + /// elements. RowVectorPtr fuzzInputRow(const RowTypePtr& rowType); /// Same as the function above, but all generated vectors are flat, i.e. no /// constant or dictionary-encoded vectors at any level. RowVectorPtr fuzzInputFlatRow(const RowTypePtr& rowType); - // Generates a random type, including maps, vectors, and arrays. maxDepth - // limits the maximum level of nesting for complex types. maxDepth <= 1 means - // no complex types are allowed. - // - // There are no options to control type generation yet; these may be added in - // the future. + /// Generates a random type, including maps, vectors, and arrays. maxDepth + /// limits the maximum level of nesting for complex types. maxDepth <= 1 means + /// no complex types are allowed. + /// + /// There are no options to control type generation yet; these may be added in + /// the future. TypePtr randType(int maxDepth = 5); /// Same as the function above, but only generate orderable types. @@ -266,14 +266,14 @@ class VectorFuzzer { const std::vector& scalarTypes, int maxDepth = 5); - // Generates short decimal TypePtr with random precision and scale. + /// Generates short decimal TypePtr with random precision and scale. inline TypePtr randShortDecimalType() { auto [precision, scale] = randPrecisionScale(ShortDecimalType::kMaxPrecision); return DECIMAL(precision, scale); } - // Generates long decimal TypePtr with random precision and scale. + /// Generates long decimal TypePtr with random precision and scale. inline TypePtr randLongDecimalType() { auto [precision, scale] = randPrecisionScale(LongDecimalType::kMaxPrecision); @@ -289,40 +289,40 @@ class VectorFuzzer { return boost::random::uniform_01()(rng_) < n; } - // Wraps the given vector in a LazyVector. If there are multiple dictionary - // layers then the lazy wrap is applied over the innermost dictionary layer. + /// Wraps the given vector in a LazyVector. If there are multiple dictionary + /// layers then the lazy wrap is applied over the innermost dictionary layer. static VectorPtr wrapInLazyVector(VectorPtr baseVector); - // Randomly applies wrapInLazyVector() to the children of the given input row - // vector. Must only be used for input row vectors where all children are - // non-null and non-lazy. Is useful when the input rowVector needs to be - // re-used between multiple evaluations. + /// Randomly applies wrapInLazyVector() to the children of the given input row + /// vector. Must only be used for input row vectors where all children are + /// non-null and non-lazy. Is useful when the input rowVector needs to be + /// re-used between multiple evaluations. RowVectorPtr fuzzRowChildrenToLazy(RowVectorPtr rowVector); - // Returns a copy of 'rowVector' but with the columns having indices listed in - // 'columnsToWrapInLazy' wrapped in lazy encoding. Must only be used for input - // row vectors where all children are non-null and non-lazy. - // 'columnsToWrapInLazy' can contain negative column indices that represent - // lazy vectors that should be preloaded before being fed to the evaluator. - // This list is sorted on the absolute value of the entries. + /// Returns a copy of 'rowVector' but with the columns having indices listed + /// in 'columnsToWrapInLazy' wrapped in lazy encoding. Must only be used for + /// input row vectors where all children are non-null and non-lazy. + /// 'columnsToWrapInLazy' can contain negative column indices that represent + /// lazy vectors that should be preloaded before being fed to the evaluator. + /// This list is sorted on the absolute value of the entries. static RowVectorPtr fuzzRowChildrenToLazy( RowVectorPtr rowVector, const std::vector& columnsToWrapInLazy); - // Generate a random null buffer. + /// Generate a random null buffer. BufferPtr fuzzNulls(vector_size_t size); - // Generate a random indices buffer of 'size' with maximum possible index - // pointing to (baseVectorSize-1). + /// Generate a random indices buffer of 'size' with maximum possible index + /// pointing to (baseVectorSize-1). BufferPtr fuzzIndices(vector_size_t size, vector_size_t baseVectorSize); private: // Generates a flat vector for primitive types. VectorPtr fuzzFlatPrimitive(const TypePtr& type, vector_size_t size); - /// Generates random precision in range [1, maxPrecision] + // Generates random precision in range [1, maxPrecision] // and scale in range [0, random precision generated]. - /// @param maximum precision. + // @param maximum precision. std::pair randPrecisionScale(int8_t maxPrecision); // Returns a complex vector with randomized data and nulls. The children and