Skip to content

Commit

Permalink
Add RowVector::pushDictionaryToRowVectorLeaves utility
Browse files Browse the repository at this point in the history
Summary:
In Nimble writer we need to merge multiple dictionary layers in
different levels of a `RowVector` into one dictionary layer at the lowest
possible vectors, so that `ArrayWithOffsets` can be encoded efficiently.  Add a
utility to achieve this.

Differential Revision: D61988576
  • Loading branch information
Yuhta authored and facebook-github-bot committed Aug 29, 2024
1 parent 205cbdf commit 70097f1
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 0 deletions.
119 changes: 119 additions & 0 deletions velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,125 @@ void RowVector::resize(vector_size_t newSize, bool setNotNull) {
}
}

namespace {

struct Wrapper {
const VectorPtr& dictionary;
BufferPtr nulls;
BufferPtr indices;
};

VectorPtr wrapInDictionary(
std::vector<Wrapper>& wrappers,
vector_size_t size,
const VectorPtr& values,
memory::MemoryPool* pool) {
if (wrappers.empty()) {
VELOX_CHECK_LE(size, values->size());
return values;
}
VELOX_CHECK_LE(size, wrappers[0].dictionary->size());
if (wrappers.size() == 1) {
if (wrappers[0].dictionary->valueVector() == values) {
return wrappers[0].dictionary;
}
return BaseVector::wrapInDictionary(
wrappers[0].dictionary->nulls(),
wrappers[0].dictionary->wrapInfo(),
size,
values);
}
if (!wrappers.back().indices) {
VELOX_CHECK_NULL(wrappers.back().nulls);
std::vector<BufferPtr> wrapInfos(wrappers.size());
std::vector<const vector_size_t*> sourceIndices(wrappers.size());
uint64_t* rawNulls = nullptr;
for (int i = 0; i < wrappers.size(); ++i) {
wrapInfos[i] = wrappers[i].dictionary->wrapInfo();
VELOX_CHECK_NOT_NULL(wrapInfos[i]);
sourceIndices[i] = wrapInfos[i]->as<vector_size_t>();
if (!rawNulls && wrappers[i].dictionary->nulls()) {
wrappers.back().nulls = allocateNulls(size, pool);
rawNulls = wrappers.back().nulls->asMutable<uint64_t>();
}
}
wrappers.back().indices = allocateIndices(size, pool);
auto* rawIndices = wrappers.back().indices->asMutable<vector_size_t>();
for (vector_size_t j = 0; j < size; ++j) {
auto index = j;
bool isNull = false;
for (int i = 0; i < wrappers.size(); ++i) {
if (wrappers[i].dictionary->isNullAt(index)) {
isNull = true;
break;
}
index = sourceIndices[i][index];
}
if (isNull) {
bits::setNull(rawNulls, j);
} else {
rawIndices[j] = index;
}
}
}
return BaseVector::wrapInDictionary(
wrappers.back().nulls, wrappers.back().indices, size, values);
}

VectorPtr pushDictionaryToRowVectorLeavesImpl(
std::vector<Wrapper>& wrappers,
vector_size_t size,
const VectorPtr& values,
memory::MemoryPool* pool) {
switch (values->encoding()) {
case VectorEncoding::Simple::LAZY: {
auto* lazy = values->asUnchecked<LazyVector>();
VELOX_CHECK(lazy->isLoaded());
return pushDictionaryToRowVectorLeavesImpl(
wrappers, size, lazy->loadedVectorShared(), pool);
}
case VectorEncoding::Simple::ROW: {
VELOX_CHECK_EQ(values->typeKind(), TypeKind::ROW);
for (auto& wrapper : wrappers) {
if (wrapper.dictionary->nulls()) {
return wrapInDictionary(wrappers, size, values, pool);
}
}
auto children = values->asUnchecked<RowVector>()->children();
for (auto& child : children) {
if (child) {
child =
pushDictionaryToRowVectorLeavesImpl(wrappers, size, child, pool);
}
}
return std::make_shared<RowVector>(
pool,
values->type(),
values->nulls(),
values->size(),
std::move(children));
}
case VectorEncoding::Simple::DICTIONARY: {
Wrapper wrapper{values, nullptr, nullptr};
wrappers.push_back(wrapper);
auto result = pushDictionaryToRowVectorLeavesImpl(
wrappers, size, values->valueVector(), pool);
wrappers.pop_back();
return result;
}
default:
return wrapInDictionary(wrappers, size, values, pool);
}
}

} // namespace

VectorPtr RowVector::pushDictionaryToRowVectorLeaves(const VectorPtr& input) {
std::vector<Wrapper> wrappers;
return pushDictionaryToRowVectorLeavesImpl(
wrappers, input->size(), input, input->pool());
}

void ArrayVectorBase::checkRanges() const {
std::unordered_map<vector_size_t, vector_size_t> seenElements;
seenElements.reserve(size());
Expand Down
9 changes: 9 additions & 0 deletions velox/vector/ComplexVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ class RowVector : public BaseVector {
/// Note : If the child is null, then it will stay null after the resize.
void resize(vector_size_t newSize, bool setNotNull = true) override;

/// Push all dictionary encoding to the leave vectors of a RowVector tree. If
/// the wrapper introduce nulls on RowVector, we don't push the dictionary
/// into that RowVector. The input vector should not contain any unloaded
/// lazy.
///
/// This is used for example in writing Nimble ArrayWithOffsets and
/// SlidingWindowMap.
static VectorPtr pushDictionaryToRowVectorLeaves(const VectorPtr& input);

VectorPtr& rawVectorForBatchReader() {
return rawVectorForBatchReader_;
}
Expand Down
80 changes: 80 additions & 0 deletions velox/vector/tests/VectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3797,6 +3797,86 @@ TEST_F(VectorTest, mapUpdateMultipleUpdates) {
}
}

TEST_F(VectorTest, pushDictionaryToRowVectorLeaves) {
auto iota = makeFlatVector<int64_t>(10, folly::identity);
auto output = RowVector::pushDictionaryToRowVectorLeaves(iota);
ASSERT_EQ(output, iota);

auto input = wrapInDictionary(makeIndicesInReverse(10), iota);
output = RowVector::pushDictionaryToRowVectorLeaves(input);
ASSERT_EQ(output, input);

{
SCOPED_TRACE("General");
input = wrapInDictionary(
makeIndicesInReverse(10),
makeRowVector({
// c0
iota,
// c1
makeRowVector({iota}),
// c2
BaseVector::wrapInDictionary(
makeNulls(10, nullEvery(3)),
makeIndicesInReverse(10),
10,
iota),
// c3
iota,
// c4
wrapInDictionary(
makeIndicesInReverse(10),
makeRowVector({
iota,
wrapInDictionary(makeIndicesInReverse(10), iota),
iota,
})),
// c5
BaseVector::wrapInDictionary(
makeNulls(10, nullEvery(7)),
makeIndicesInReverse(10),
10,
makeRowVector({iota, iota})),
}));
output = RowVector::pushDictionaryToRowVectorLeaves(input);
test::assertEqualVectors(input, output);
ASSERT_EQ(output->encoding(), VectorEncoding::Simple::ROW);
auto* outputRow = output->asUnchecked<RowVector>();
auto& c0 = outputRow->childAt(0);
ASSERT_EQ(c0->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c0->wrapInfo().get(), input->wrapInfo().get());
auto& c1 = outputRow->childAt(1);
ASSERT_EQ(c1->encoding(), VectorEncoding::Simple::ROW);
auto* c1Row = c1->asUnchecked<RowVector>();
auto& c1c0 = c1Row->childAt(0);
ASSERT_EQ(c1c0->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c1c0->wrapInfo().get(), c0->wrapInfo().get());
auto& c2 = outputRow->childAt(2);
ASSERT_EQ(c2->encoding(), VectorEncoding::Simple::DICTIONARY);
auto& c3 = outputRow->childAt(3);
ASSERT_EQ(c3->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c3->wrapInfo().get(), c0->wrapInfo().get());
auto& c4 = outputRow->childAt(4);
ASSERT_EQ(c4->encoding(), VectorEncoding::Simple::ROW);
auto* c4Row = c4->asUnchecked<RowVector>();
auto& c4c0 = c4Row->childAt(0);
ASSERT_EQ(c4c0->encoding(), VectorEncoding::Simple::DICTIONARY);
auto& c4c1 = c4Row->childAt(1);
ASSERT_EQ(c4c1->encoding(), VectorEncoding::Simple::DICTIONARY);
auto& c4c2 = c4Row->childAt(2);
ASSERT_EQ(c4c2->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c4c0->wrapInfo().get(), c4c2->wrapInfo().get());
auto& c5 = outputRow->childAt(5);
ASSERT_EQ(c5->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c5->valueVector()->encoding(), VectorEncoding::Simple::ROW);
auto* c5Row = c5->valueVector()->asUnchecked<RowVector>();
auto& c5c0 = c5Row->childAt(0);
ASSERT_EQ(c5c0->encoding(), VectorEncoding::Simple::FLAT);
auto& c5c1 = c5Row->childAt(1);
ASSERT_EQ(c5c1->encoding(), VectorEncoding::Simple::FLAT);
}
}

TEST_F(VectorTest, arrayCopyTargetNullOffsets) {
auto target = BaseVector::create(ARRAY(BIGINT()), 11, pool());
auto offsetsRef = target->asUnchecked<ArrayVector>()->offsets();
Expand Down

0 comments on commit 70097f1

Please sign in to comment.