Skip to content

Commit

Permalink
Temp HashProb PR
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Aug 12, 2024
1 parent 91420f4 commit 4ea4855
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 52 deletions.
15 changes: 12 additions & 3 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ void HashProbe::initialize() {
isIdentityProjection_ = true;
}

std::vector<vector_size_t> listColumns;
listColumns.reserve(tableOutputProjections_.size());
for (const auto& projection : tableOutputProjections_) {
listColumns.push_back(projection.inputChannel);
}
results_.listColumns = listColumns;

if (nullAware_) {
filterTableResult_.resize(1);
}
Expand Down Expand Up @@ -660,6 +667,7 @@ void HashProbe::addInput(RowVectorPtr input) {
lookup_->hits.resize(lookup_->rows.back() + 1);
table_->joinProbe(*lookup_);
}

results_.reset(*lookup_);
}

Expand Down Expand Up @@ -998,7 +1006,8 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
results_,
joinIncludesMissesFromLeft(joinType_),
mapping,
folly::Range(outputTableRows_.data(), outputTableRows_.size()));
folly::Range(outputTableRows_.data(), outputTableRows_.size()),
operatorCtx_->driverCtx()->queryConfig().preferredOutputBatchBytes());
}

// We are done processing the input batch if there are no more joined rows
Expand Down Expand Up @@ -1698,7 +1707,7 @@ void HashProbe::spillOutput(const std::vector<HashProbe*>& operators) {
}
}

auto syncGuard = folly::makeGuard([&]() {
SCOPE_EXIT {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
Expand All @@ -1708,7 +1717,7 @@ void HashProbe::spillOutput(const std::vector<HashProbe*>& operators) {
} catch (const std::exception&) {
}
}
});
};

for (auto& spillTask : spillTasks) {
const auto result = spillTask->move();
Expand Down
68 changes: 59 additions & 9 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1750,18 +1750,55 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
}

template <bool ignoreNullKeys>
inline uint64_t HashTable<ignoreNullKeys>::joinProjectedVarColumnsSize(
const std::vector<vector_size_t>& columns,
const char* row) const {
uint64_t totalBytes{0};
for (const auto& column : columns) {
if (!rows_->columnTypes()[column]->isFixedWidth()) {
totalBytes += rows_->variableSizeAt(row, column);
}
}
return totalBytes;
}

template <bool ignoreNullKeys>
int32_t HashTable<ignoreNullKeys>::listJoinResults(
JoinResultIterator& iter,
bool includeMisses,
folly::Range<vector_size_t*> inputRows,
folly::Range<char**> hits) {
folly::Range<char**> hits,
uint64_t maxBytes) {
VELOX_CHECK_LE(inputRows.size(), hits.size());
if (!hasDuplicates_) {
return listJoinResultsNoDuplicates(iter, includeMisses, inputRows, hits);
if (!iter.listColumnSizeInfoSet) {
// Once list column related information is set, it will be valid throughout
// the life time of the 'JoinResultIterator'.
iter.listColumnSizeInfoSet = true;
for (const auto column : iter.listColumns) {
if (rows_->columnTypes()[column]->isFixedWidth()) {
iter.fixedSizeListColumnsSizeSum += rows_->fixedSizeAt(column);
} else {
iter.varSizeListColumns.push_back(column);
}
}
}

if (iter.varSizeListColumns.empty() && !hasDuplicates_) {
// When there is no duplicates, and no variable length columns are selected
// to be projected, we are able to calculate fixed length columns total size
// directly and go through fast path.
return listJoinResultsFastPath(
iter,
includeMisses,
inputRows,
hits,
maxBytes);
}

size_t numOut = 0;
auto maxOut = inputRows.size();
uint64_t totalBytes{0};
while (iter.lastRowIndex < iter.rows->size()) {
auto row = (*iter.rows)[iter.lastRowIndex];
auto hit = (*iter.hits)[row]; // NOLINT
Expand All @@ -1784,6 +1821,9 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
hits[numOut] = hit;
numOut++;
iter.lastRowIndex++;
totalBytes +=
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
auto numRows = rows->size();
auto num =
Expand All @@ -1795,36 +1835,46 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
num * sizeof(char*));
iter.lastDuplicateRowIndex += num;
numOut += num;
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow);
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
if (iter.lastDuplicateRowIndex >= numRows) {
iter.lastDuplicateRowIndex = 0;
iter.lastRowIndex++;
}
}
if (numOut >= maxOut) {
if (numOut >= maxOut || totalBytes >= maxBytes) {
return numOut;
}
}
return numOut;
}

template <bool ignoreNullKeys>
int32_t HashTable<ignoreNullKeys>::listJoinResultsNoDuplicates(
int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
JoinResultIterator& iter,
bool includeMisses,
folly::Range<vector_size_t*> inputRows,
folly::Range<char**> hits) {
folly::Range<char**> hits,
uint64_t maxBytes) {
int32_t numOut = 0;
auto maxOut = inputRows.size();
const auto maxOut = std::min(
static_cast<uint64_t>(inputRows.size()),
(iter.fixedSizeListColumnsSizeSum != 0
? maxBytes / iter.fixedSizeListColumnsSizeSum
: std::numeric_limits<uint64_t>::max()));
int32_t i = iter.lastRowIndex;
auto numRows = iter.rows->size();
const auto numRows = iter.rows->size();

constexpr int32_t kWidth = xsimd::batch<int64_t>::size;
auto sourceHits = reinterpret_cast<int64_t*>(iter.hits->data());
auto sourceRows = iter.rows->data();
// We pass the pointers as int64_t's in 'hitWords'.
auto resultHits = reinterpret_cast<int64_t*>(hits.data());
auto resultRows = inputRows.data();
int32_t outLimit = maxOut - kWidth;
const auto outLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) {
auto indices = simd::loadGatherIndices<int64_t, int32_t>(sourceRows + i);
auto hitWords = simd::gather(sourceHits, indices);
Expand Down
62 changes: 45 additions & 17 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,9 @@ class BaseHashTable {
/// Returns the string of the given 'mode'.
static std::string modeString(HashMode mode);

// Keeps track of results returned from a join table. One batch of
// keys can produce multiple batches of results. This is initialized
// from HashLookup, which is expected to stay constant while 'this'
// is being used.
/// Keeps track of results returned from a join table. One batch of keys can
/// produce multiple batches of results. This is initialized from HashLookup,
/// which is expected to stay constant while 'this' is being used.
struct JoinResultIterator {
void reset(const HashLookup& lookup) {
rows = &lookup.rows;
Expand All @@ -154,8 +153,22 @@ class BaseHashTable {

const raw_vector<vector_size_t>* rows{nullptr};
const raw_vector<char*>* hits{nullptr};

vector_size_t lastRowIndex{0};
vector_size_t lastDuplicateRowIndex{0};

/// The indexes of the build side columns that are going to be projected as
/// output.
std::vector<vector_size_t> listColumns;
/// The indexes of the build side variable size columns that are going to be
/// projected as output.
std::vector<vector_size_t> varSizeListColumns;
/// The per row total bytes of the columns that are both fixed sized and to
/// be projected as output.
uint64_t fixedSizeListColumnsSizeSum{0};
/// Indicates if 'varSizeListJoinColumns' and 'fixedSizeListColumnsSizeSum'
/// are already set, as we only need to set this once.
bool listColumnSizeInfoSet{false};
};

struct RowsIterator {
Expand Down Expand Up @@ -231,11 +244,14 @@ class BaseHashTable {
/// set to nullptr if 'includeMisses' is true. Otherwise, skips input rows
/// without a match. 'includeMisses' is set to true when listing results for
/// the LEFT join.
/// The filling stops when the total size of currently listed rows exceeds
/// 'maxBytes'.
virtual int32_t listJoinResults(
JoinResultIterator& iter,
bool includeMisses,
folly::Range<vector_size_t*> inputRows,
folly::Range<char**> hits) = 0;
folly::Range<char**> hits,
uint64_t maxBytes) = 0;

/// Returns rows with 'probed' flag unset. Used by the right/full join.
virtual int32_t listNotProbedRows(
Expand Down Expand Up @@ -491,7 +507,8 @@ class HashTable : public BaseHashTable {
JoinResultIterator& iter,
bool includeMisses,
folly::Range<vector_size_t*> inputRows,
folly::Range<char**> hits) override;
folly::Range<char**> hits,
uint64_t maxBytes) override;

int32_t listNotProbedRows(
RowsIterator* iter,
Expand Down Expand Up @@ -716,12 +733,14 @@ class HashTable : public BaseHashTable {
int32_t numNew,
int8_t spillInputStartPartitionBit) override;

// Fast path for join results when there are no duplicates in the table.
int32_t listJoinResultsNoDuplicates(
// Fast path for join results when there are no duplicates in the table and
// only fixed size rows are to be extract.
int32_t listJoinResultsFastPath(
JoinResultIterator& iter,
bool includeMisses,
folly::Range<vector_size_t*> inputRows,
folly::Range<char**> hits);
folly::Range<char**> hits,
uint64_t maxBytes);

// Tries to use as many range hashers as can in a normalized key situation.
void enableRangeWhereCan(
Expand Down Expand Up @@ -773,14 +792,14 @@ class HashTable : public BaseHashTable {
raw_vector<uint64_t>& hashes,
bool initNormalizedKeys);

/// Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a
/// RowContainer owned by 'this'. 'hashes' are the hash numbers or array
/// indices (if kArray mode) for each group. Duplicate key rows are chained
/// via their next link. If not null, 'partitionInfo' provides the table
/// partition info for parallel join table build. It specifies the first and
/// (exclusive) last indexes of the insert entries in the table. If a row
/// can't be inserted within this range, it is not inserted but rather added
/// to the end of 'overflows' in 'partitionInfo'.
// Inserts 'numGroups' entries into 'this'. 'groups' point to contents in a
// RowContainer owned by 'this'. 'hashes' are the hash numbers or array
// indices (if kArray mode) for each group. Duplicate key rows are chained
// via their next link. If not null, 'partitionInfo' provides the table
// partition info for parallel join table build. It specifies the first and
// (exclusive) last indexes of the insert entries in the table. If a row
// can't be inserted within this range, it is not inserted but rather added
// to the end of 'overflows' in 'partitionInfo'.
void insertForJoin(
RowContainer* rows,
char** groups,
Expand Down Expand Up @@ -857,6 +876,14 @@ class HashTable : public BaseHashTable {
// Shortcut for probe with normalized keys.
void joinNormalizedKeyProbe(HashLookup& lookup);

// Returns the total size of the variable size 'columns' in 'row'.
// NOTE: No checks are done in the method for performance considerations.
// Caller needs to make sure only variable size columns are inside of
// 'columns'.
inline uint64_t joinProjectedVarColumnsSize(
const std::vector<vector_size_t>& columns,
const char* row) const;

// Adds a row to a hash join table in kArray hash mode. Returns true
// if a new entry was made and false if the row was added to an
// existing set of rows with the same key.
Expand Down Expand Up @@ -892,6 +919,7 @@ class HashTable : public BaseHashTable {
// content. Returns true if all hashers offer a mapping to value ids
// for array or normalized key.
bool analyze();

// Erases the entries of rows from the hash table and its RowContainer.
// 'hashes' must be computed according to 'hashMode_'.
void eraseWithHashes(folly::Range<char**> rows, uint64_t* hashes);
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) {
}
}

int32_t RowContainer::fixedSizeAt(column_index_t column) {
return typeKindSize(typeKinds_[column]);
}

int32_t RowContainer::extractVariableSizeAt(
const char* row,
column_index_t column,
Expand Down
33 changes: 18 additions & 15 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,13 @@ class RowContainer {
return rowColumns_[index];
}

/// Returns the size of a string or complex types value stored in the
/// specified row and column.
int32_t variableSizeAt(const char* row, column_index_t column);

/// Returns the per row size of a fixed size column.
int32_t fixedSizeAt(column_index_t column);

/// Bit offset of the probed flag for a full or right outer join payload.
/// 0 if not applicable.
int32_t probedFlagOffset() const {
Expand Down Expand Up @@ -784,24 +791,20 @@ class RowContainer {
return *reinterpret_cast<T*>(group + offset);
}

/// Returns the size of a string or complex types value stored in the
/// specified row and column.
int32_t variableSizeAt(const char* row, column_index_t column);

/// Copies a string or complex type value from the specified row and column
/// into provided buffer. Stored the size of the data in the first 4 bytes of
/// the buffer. If the value is null, writes zero into the first 4 bytes of
/// destination and returns.
/// @return The number of bytes written to 'destination' including the 4 bytes
/// of the size.
// Copies a string or complex type value from the specified row and column
// into provided buffer. Stored the size of the data in the first 4 bytes of
// the buffer. If the value is null, writes zero into the first 4 bytes of
// destination and returns.
// @return The number of bytes written to 'destination' including the 4 bytes
// of the size.
int32_t
extractVariableSizeAt(const char* row, column_index_t column, char* output);

/// Copies a string or complex type value from 'data' into the specified row
/// and column. Expects first 4 bytes in 'data' to contain the size of the
/// string or complex value.
/// @return The number of bytes read from 'data': 4 bytes for size + that many
/// bytes.
// Copies a string or complex type value from 'data' into the specified row
// and column. Expects first 4 bytes in 'data' to contain the size of the
// string or complex value.
// @return The number of bytes read from 'data': 4 bytes for size + that many
// bytes.
int32_t
storeVariableSizeAt(const char* data, char* row, column_index_t column);

Expand Down
Loading

0 comments on commit 4ea4855

Please sign in to comment.