Skip to content

Commit

Permalink
Add batch version of RowContainer::store API (#10812)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #10812

Reviewed By: xiaoxmeng, DanielHunte

Differential Revision: D61924803

Pulled By: kgpai

fbshipit-source-id: f6fbab99e40d1d423aecc24f91b47059a73ba98d
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Sep 10, 2024
1 parent d9e0e9b commit de734a0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 7 deletions.
29 changes: 29 additions & 0 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,35 @@ void RowContainer::store(
}
}

void RowContainer::store(
const DecodedVector& decoded,
folly::Range<char**> rows,
int32_t column) {
VELOX_CHECK_GE(decoded.size(), rows.size());
const bool isKey = column < keyTypes_.size();
if ((isKey && !nullableKeys_) || !decoded.mayHaveNulls()) {
VELOX_DYNAMIC_TYPE_DISPATCH(
storeNoNullsBatch,
typeKinds_[column],
decoded,
rows,
isKey,
offsets_[column]);
} else {
const auto rowColumn = rowColumns_[column];
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
storeWithNullsBatch,
typeKinds_[column],
decoded,
rows,
isKey,
rowColumn.offset(),
rowColumn.nullByte(),
rowColumn.nullMask(),
column);
}
}

std::unique_ptr<ByteInputStream> RowContainer::prepareRead(
const char* row,
int32_t offset) {
Expand Down
33 changes: 33 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ class RowContainer {
char* row,
int32_t columnIndex);

/// Stores the first 'rows.size' values from the 'decoded' vector into the
/// 'columnIndex' column of 'rows'.
void store(
const DecodedVector& decoded,
folly::Range<char**> rows,
int32_t columnIndex);

HashStringAllocator& stringAllocator() {
return *stringAllocator_;
}
Expand Down Expand Up @@ -965,6 +972,32 @@ class RowContainer {
}
}

template <TypeKind Kind>
inline void storeWithNullsBatch(
const DecodedVector& decoded,
folly::Range<char**> rows,
bool isKey,
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t column) {
for (int32_t i = 0; i < rows.size(); ++i) {
storeWithNulls<Kind>(
decoded, i, isKey, rows[i], offset, nullByte, nullMask, column);
}
}

template <TypeKind Kind>
inline void storeNoNullsBatch(
const DecodedVector& decoded,
folly::Range<char**> rows,
bool isKey,
int32_t offset) {
for (int32_t i = 0; i < rows.size(); ++i) {
storeNoNulls<Kind>(decoded, i, isKey, rows[i], offset);
}
}

template <bool useRowNumbers, typename T>
static void extractValuesWithNulls(
const char* const* rows,
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ void SortBuffer::addInput(const VectorPtr& input) {
for (const auto& columnProjection : columnMap_) {
DecodedVector decoded(
*inputRow->childAt(columnProjection.outputChannel), allRows);
for (int i = 0; i < input->size(); ++i) {
data_->store(decoded, i, rows[i], columnProjection.inputChannel);
}
data_->store(
decoded,
folly::Range(rows.data(), input->size()),
columnProjection.inputChannel);
}
numInputRows_ += allRows.size();
}
Expand Down
6 changes: 2 additions & 4 deletions velox/exec/benchmarks/PrefixSortBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ class TestCase {
}
for (auto column = 0; column < data->childrenSize(); ++column) {
DecodedVector decoded(*data->childAt(column));
for (int i = 0; i < numRows; ++i) {
char* row = rows_[i];
rowContainer()->store(decoded, i, row, column);
}
rowContainer()->store(
decoded, folly::Range(rows_.data(), numRows), column);
}
}

Expand Down
54 changes: 54 additions & 0 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2116,3 +2116,57 @@ TEST_F(RowContainerTest, columnHasNulls) {
}
}
}

TEST_F(RowContainerTest, store) {
const uint64_t kNumRows = 1000;
auto rowVectorWithNulls = makeRowVector({
makeFlatVector<int64_t>(
kNumRows, [](auto row) { return row % 5; }, nullEvery(6)),
makeFlatVector<std::string>(
kNumRows,
[](auto row) { return fmt::format("abcdefg123_{}", row); },
nullEvery(7)),
makeFlatVector<int64_t>(
kNumRows, [](auto row) { return row; }, nullEvery(8)),
makeArrayVector<int32_t>(
kNumRows,
[](auto i) { return i % 5; },
[](auto i) { return i % 10; },
nullEvery(10)),
});

auto rowVectorNoNulls = makeRowVector({
makeFlatVector<int64_t>(kNumRows, [](auto row) { return row % 5; }),
makeFlatVector<std::string>(
kNumRows, [](auto row) { return fmt::format("abcdefg12_{}", row); }),
makeFlatVector<int64_t>(kNumRows, [](auto row) { return row; }),
makeArrayVector<int64_t>(
kNumRows,
[](auto i) { return i % 3; },
[](auto i) { return i % 10; }),
});
for (auto& rowVector : {rowVectorWithNulls, rowVectorNoNulls}) {
auto rowContainer = makeRowContainer(
{BIGINT(), VARCHAR()}, {BIGINT(), ARRAY(BIGINT())}, false);
std::vector<char*> rows;
rows.reserve(kNumRows);

ASSERT_EQ(rowContainer->numRows(), 0);
SelectivityVector allRows(kNumRows);
for (size_t i = 0; i < kNumRows; i++) {
auto row = rowContainer->newRow();
rows.push_back(row);
}
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
DecodedVector decoded(*rowVector->childAt(i), allRows);
rowContainer->store(decoded, folly::Range(rows.data(), kNumRows), i);
}
ASSERT_EQ(rowContainer->numRows(), kNumRows);
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
auto vector =
BaseVector::create(rowVector->childAt(i)->type(), kNumRows, pool());
rowContainer->extractColumn(rows.data(), kNumRows, i, vector);
assertEqualVectors(rowVector->childAt(i), vector);
}
}
}

0 comments on commit de734a0

Please sign in to comment.