Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy Distinct operation #1558

Merged
merged 15 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 101 additions & 58 deletions src/engine/Distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ size_t Distinct::getResultWidth() const { return subtree_->getResultWidth(); }
Distinct::Distinct(QueryExecutionContext* qec,
std::shared_ptr<QueryExecutionTree> subtree,
const std::vector<ColumnIndex>& keepIndices)
: Operation(qec), subtree_(std::move(subtree)), _keepIndices(keepIndices) {}
: Operation{qec}, subtree_{std::move(subtree)}, keepIndices_{keepIndices} {}

// _____________________________________________________________________________
string Distinct::getCacheKeyImpl() const {
return absl::StrCat("DISTINCT (", subtree_->getCacheKey(), ") (",
absl::StrJoin(_keepIndices, ","), ")");
absl::StrJoin(keepIndices_, ","), ")");
}

// _____________________________________________________________________________
Expand All @@ -33,27 +33,28 @@ VariableToColumnMap Distinct::computeVariableToColumnMap() const {
return subtree_->getVariableColumns();
}

// _____________________________________________________________________________
template <size_t WIDTH>
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
cppcoro::generator<IdTable> Distinct::lazyDistinct(
cppcoro::generator<IdTable> originalGenerator,
std::vector<ColumnIndex> keepIndices,
std::optional<IdTable> aggregateTable) {
Result::Generator Distinct::lazyDistinct(Result::Generator input,
bool yieldOnce) const {
IdTable aggregateTable{subtree_->getResultWidth(), allocator()};
LocalVocab aggregateVocab{};
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow =
std::nullopt;
for (IdTable& idTable : originalGenerator) {
IdTable result =
distinct<WIDTH>(std::move(idTable), keepIndices, previousRow);
for (auto& [idTable, localVocab] : input) {
IdTable result = distinct<WIDTH>(std::move(idTable), previousRow);
if (!result.empty()) {
previousRow.emplace(result.asStaticView<WIDTH>().back());
if (aggregateTable.has_value()) {
aggregateTable.value().insertAtEnd(result);
if (yieldOnce) {
aggregateVocab.mergeWith(std::array{std::move(localVocab)});
aggregateTable.insertAtEnd(result);
} else {
co_yield result;
co_yield {std::move(result), std::move(localVocab)};
}
}
}
if (aggregateTable.has_value()) {
co_yield aggregateTable.value();
if (yieldOnce) {
co_yield {std::move(aggregateTable), std::move(aggregateVocab)};
}
}

Expand All @@ -65,75 +66,117 @@ ProtoResult Distinct::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Distinct result computation..." << endl;
size_t width = subtree_->getResultWidth();
if (subRes->isFullyMaterialized()) {
IdTable idTable =
CALL_FIXED_SIZE(width, &Distinct::distinct, subRes->idTable().clone(),
_keepIndices, std::nullopt);
IdTable idTable = CALL_FIXED_SIZE(width, &Distinct::outOfPlaceDistinct,
this, subRes->idTable());
LOG(DEBUG) << "Distinct result computation done." << endl;
return {std::move(idTable), resultSortedOn(),
subRes->getSharedLocalVocab()};
}

auto generator = CALL_FIXED_SIZE(
width, &Distinct::lazyDistinct, std::move(subRes->idTables()),
_keepIndices,
requestLaziness ? std::nullopt
: std::optional{IdTable{width, allocator()}});
if (!requestLaziness) {
IdTable result = cppcoro::getSingleElement(std::move(generator));
return {std::move(result), resultSortedOn(), subRes->getSharedLocalVocab()};
}
return {std::move(generator), resultSortedOn(),
subRes->getSharedLocalVocab()};
auto generator =
CALL_FIXED_SIZE(width, &Distinct::lazyDistinct, this,
std::move(subRes->idTables()), !requestLaziness);
return requestLaziness
? ProtoResult{std::move(generator), resultSortedOn()}
: ProtoResult{cppcoro::getSingleElement(std::move(generator)),
resultSortedOn()};
}

// _____________________________________________________________________________
bool Distinct::matchesRow(const auto& a, const auto& b) const {
return std::ranges::all_of(keepIndices_,
[&a, &b](ColumnIndex i) { return a[i] == b[i]; });
}

// _____________________________________________________________________________
template <size_t WIDTH>
IdTable Distinct::distinct(
IdTable dynInput, const std::vector<ColumnIndex>& keepIndices,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) {
AD_CONTRACT_CHECK(keepIndices.size() <= dynInput.numColumns());
IdTable dynInput,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) const {
AD_CONTRACT_CHECK(keepIndices_.size() <= dynInput.numColumns());
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
IdTableStatic<WIDTH> result = std::move(dynInput).toStatic<WIDTH>();

auto matchesRow = [&keepIndices](const auto& a, const auto& b) {
for (ColumnIndex i : keepIndices) {
if (a[i] != b[i]) {
return false;
}
}
return true;
};

// Variant of `std::ranges::unique` that allows to skip the first rows of
// Variant of `std::ranges::unique` that allows to skip the begin rows of
// elements found in the previous table.
auto first = std::ranges::find_if(result, [&matchesRow,
&previousRow](const auto& row) {
return !previousRow.has_value() || !matchesRow(row, previousRow.value());
});
auto last = result.end();
auto begin =
std::ranges::find_if(result, [this, &previousRow](const auto& row) {
// Without explicit this clang seems to
// think the this capture is redundant.
return !previousRow.has_value() ||
!this->matchesRow(row, previousRow.value());
});
auto end = result.end();

auto dest = result.begin();
if (first == dest) {
if (begin == dest) {
// Optimization to avoid redundant move operations.
first = std::ranges::adjacent_find(first, last, matchesRow);
dest = first;
if (first != last) {
++first;
begin = std::ranges::adjacent_find(begin, end,
[this](const auto& a, const auto& b) {
// Without explicit this clang seems to
// think the this capture is redundant.
return this->matchesRow(a, b);
});
dest = begin;
if (begin != end) {
++begin;
}
} else if (first != last) {
*dest = std::move(*first);
} else if (begin != end) {
*dest = std::move(*begin);
}

if (first != last) {
while (++first != last) {
if (!matchesRow(*dest, *first)) {
*++dest = std::move(*first);
if (begin != end) {
while (++begin != end) {
if (!matchesRow(*dest, *begin)) {
*++dest = std::move(*begin);
checkCancellation();
}
}
++dest;
}
result.erase(dest, last);
checkCancellation();
result.erase(dest, end);
checkCancellation();

LOG(DEBUG) << "Distinct done.\n";
return std::move(result).toDynamic();
}

// _____________________________________________________________________________
template <size_t WIDTH>
IdTable Distinct::outOfPlaceDistinct(const IdTable& dynInput) const {
AD_CONTRACT_CHECK(keepIndices_.size() <= dynInput.numColumns());
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
auto inputView = dynInput.asStaticView<WIDTH>();
IdTableStatic<WIDTH> output{dynInput.numColumns(), allocator()};

auto begin = inputView.begin();
auto end = inputView.end();
while (begin < end) {
int64_t allowedOffset = std::min(end - begin, CHUNK_SIZE);
begin = std::ranges::unique_copy(begin, begin + allowedOffset,
std::back_inserter(output),
[this](const auto& a, const auto& b) {
// Without explicit this clang seems to
// think the this capture is redundant.
return this->matchesRow(a, b);
})
.in;
checkCancellation();
// Skip to next unique value
do {
allowedOffset = std::min(end - begin, CHUNK_SIZE);
auto lastRow = begin[-1];
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
begin = std::ranges::find_if(begin, begin + allowedOffset,
[this, &lastRow](const auto& row) {
// Without explicit this clang seems to
// think the this capture is redundant.
return !this->matchesRow(row, lastRow);
});
checkCancellation();
} while (begin != end && matchesRow(*begin, begin[-1]));
}

LOG(DEBUG) << "Distinct done.\n";
return std::move(output).toDynamic();
}
33 changes: 24 additions & 9 deletions src/engine/Distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
class Distinct : public Operation {
private:
std::shared_ptr<QueryExecutionTree> subtree_;
std::vector<ColumnIndex> _keepIndices;
std::vector<ColumnIndex> keepIndices_;

static constexpr int64_t CHUNK_SIZE = 100'000;

public:
Distinct(QueryExecutionContext* qec,
Expand Down Expand Up @@ -54,20 +56,33 @@ class Distinct : public Operation {

VariableToColumnMap computeVariableToColumnMap() const override;

// Helper function that only compares rows on the columns in `keepIndices_`.
bool matchesRow(const auto& a, const auto& b) const;

// Return a generator that applies an in-place unique algorithm to the
// `IdTables`s yielded by the input generator. The `yieldOnce` flag controls
// if every `IdTable` from `input` should yield it's own `IdTable` or if all
// of them should get aggregated into a single big `IdTable`.
template <size_t WIDTH>
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
static cppcoro::generator<IdTable> lazyDistinct(
cppcoro::generator<IdTable> originalGenerator,
std::vector<ColumnIndex> keepIndices,
std::optional<IdTable> aggregateTable);
Result::Generator lazyDistinct(Result::Generator input, bool yieldOnce) const;

// Removes all duplicates from input with regards to the columns
// in keepIndices. The input needs to be sorted on the keep indices,
// otherwise the result of this function is undefined.
// otherwise the result of this function is undefined. The argument
// `previousRow` might hold a row representing the last row of the previous
// `IdTable`, so that the `IdTable` that will be returned doesn't return
// values that were already returned in the previous `IdTable`.
template <size_t WIDTH>
IdTable distinct(
IdTable dynInput,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) const;

// Out-of-place implementation of the unique algorithm. Does only copy values
// if they're actually unique.
template <size_t WIDTH>
static IdTable distinct(
IdTable dynInput, const std::vector<ColumnIndex>& keepIndices,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow);
IdTable outOfPlaceDistinct(const IdTable& dynInput) const;

FRIEND_TEST(Distinct, distinct);
FRIEND_TEST(Distinct, distinctWithEmptyInput);
FRIEND_TEST(Distinct, testChunkEdgeCases);
};
70 changes: 62 additions & 8 deletions test/engine/DistinctTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@ using V = Variable;

namespace {
// Convert a generator to a vector for easier comparison in assertions
std::vector<IdTable> toVector(cppcoro::generator<IdTable> generator) {
std::vector<IdTable> toVector(Result::Generator generator) {
std::vector<IdTable> result;
for (auto& table : generator) {
for (auto& [table, vocab] : generator) {
// IMPORTANT: The `vocab` will go out of scope here, but the tests don't use
// any vocabulary so this is fine.
result.push_back(std::move(table));
}
return result;
}

Distinct makeDistinct(const std::vector<ColumnIndex>& keepIndices) {
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
auto* qec = ad_utility::testing::getQec();
return {qec,
ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::vector<IdTable>{},
std::vector<std::optional<Variable>>{Variable{"?x"}}),
keepIndices};
}
} // namespace

TEST(Distinct, CacheKey) {
Expand All @@ -45,21 +56,64 @@ TEST(Distinct, distinct) {
IdTable input{makeIdTableFromVector(
{{1, 1, 3, 7}, {6, 1, 3, 6}, {2, 2, 3, 5}, {3, 6, 5, 4}, {1, 6, 5, 1}})};

std::vector<ColumnIndex> keepIndices{{1, 2}};
IdTable result = CALL_FIXED_SIZE(4, Distinct::distinct, std::move(input),
keepIndices, std::nullopt);
Distinct distinct = makeDistinct({1, 2});
IdTable result = distinct.outOfPlaceDistinct<4>(input);

// For easier checking.
IdTable expectedResult{
makeIdTableFromVector({{1, 1, 3, 7}, {2, 2, 3, 5}, {3, 6, 5, 4}})};
ASSERT_EQ(expectedResult, result);
}

// _____________________________________________________________________________
TEST(Distinct, testChunkEdgeCases) {
Distinct distinct = makeDistinct({0});
IdTable input{1, ad_utility::testing::makeAllocator()};
IdTable::row_type row{1};

{
input.resize(Distinct::CHUNK_SIZE + 1);
row[0] = Id::makeFromInt(0);
std::ranges::fill(input, row);
IdTable result = distinct.outOfPlaceDistinct<1>(input);

ASSERT_EQ(makeIdTableFromVector({{0}}, &Id::makeFromInt), result);
}

{
input.resize(Distinct::CHUNK_SIZE + 1);
row[0] = Id::makeFromInt(0);
std::ranges::fill(input, row);
input.at(Distinct::CHUNK_SIZE, 0) = Id::makeFromInt(1);
IdTable result = distinct.outOfPlaceDistinct<1>(input);

ASSERT_EQ(makeIdTableFromVector({{0}, {1}}, &Id::makeFromInt), result);
}

{
input.resize(2 * Distinct::CHUNK_SIZE);
row[0] = Id::makeFromInt(0);
std::ranges::fill(input, row);
IdTable result = distinct.outOfPlaceDistinct<1>(input);

ASSERT_EQ(makeIdTableFromVector({{0}}, &Id::makeFromInt), result);
}

{
input.resize(2 * Distinct::CHUNK_SIZE + 2);
row[0] = Id::makeFromInt(0);
std::ranges::fill(input, row);
input.at(2 * Distinct::CHUNK_SIZE + 1, 0) = Id::makeFromInt(1);
IdTable result = distinct.outOfPlaceDistinct<1>(input);

ASSERT_EQ(makeIdTableFromVector({{0}, {1}}, &Id::makeFromInt), result);
}
}

// _____________________________________________________________________________
TEST(Distinct, distinctWithEmptyInput) {
IdTable input{1, makeAllocator()};
IdTable result = CALL_FIXED_SIZE(1, Distinct::distinct, input.clone(),
std::vector<ColumnIndex>{}, std::nullopt);
Distinct distinct = makeDistinct({});
IdTable result = distinct.outOfPlaceDistinct<1>(input);
ASSERT_EQ(input, result);
}

Expand Down
Loading