From f7acdd08005eb097f2faabd2142e3b203da2dec6 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Wed, 3 Mar 2021 15:09:27 +0800 Subject: [PATCH 01/11] Refactor seq iter. --- src/context/Iterator.cpp | 75 +++++++++++++++++++++++------ src/context/Iterator.h | 70 ++++++++------------------- src/context/test/IteratorTest.cpp | 4 +- src/executor/query/SortExecutor.cpp | 10 ++-- src/executor/query/TopNExecutor.cpp | 8 +-- src/executor/query/TopNExecutor.h | 2 +- 6 files changed, 94 insertions(+), 75 deletions(-) diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp index ad353ee3a..1dd144301 100644 --- a/src/context/Iterator.cpp +++ b/src/context/Iterator.cpp @@ -460,11 +460,14 @@ List GetNeighborsIter::getEdges() { SequentialIter::SequentialIter(std::shared_ptr value) : Iterator(value, Kind::kSequential) { DCHECK(value->isDataSet()); - auto& ds = value->getDataSet(); - for (auto& row : ds.rows) { - rows_.emplace_back(&row); + auto& ds = value->mutableDataSet(); + iter_ = ds.rows.begin(); + rows_ = &ds.rows; + bitIdx_ = 0; + if (bitset_.empty()) { + bitset_.push_back(true); + size_ = rows_->size(); } - iter_ = rows_.begin(); for (size_t i = 0; i < ds.colNames.size(); ++i) { colIndices_.emplace(ds.colNames[i], i); } @@ -484,18 +487,60 @@ SequentialIter::SequentialIter(std::vector> inputList) } void SequentialIter::init(std::vector>&& iterators) { - DCHECK(!iterators.empty()); - const auto& firstIter = iterators.front(); - DCHECK(firstIter->isSequentialIter()); - colIndices_ = static_cast(firstIter.get())->getColIndices(); - for (auto& iter : iterators) { - DCHECK(iter->isSequentialIter()); - auto inputIter = static_cast(iter.get()); - rows_.insert(rows_.end(), - std::make_move_iterator(inputIter->begin()), - std::make_move_iterator(inputIter->end())); + UNUSED(iterators); +} + +bool SequentialIter::valid() const { + return iter_ < rows_->end(); +} + +void SequentialIter::next() { + while (valid()) { + ++iter_; + ++bitIdx_; + if (static_cast(bitIdx_) >= bitset_.size()) { + bitset_.push_back(true); + } else if (!bitset_[bitIdx_]) { + continue; + } + break; + } +} + +void SequentialIter::erase() { + bitset_[bitIdx_] = false; + next(); + --size_; +} + +void SequentialIter::unstableErase() { + erase(); +} + +void SequentialIter::eraseRange(size_t first, size_t last) { + if (first >= last || first >= size()) { + return; + } + if (last >= size()) { + last = size(); + } + for (size_t i = first; i < last; ++i) { + bitset_[i] = false; + } + reset(); + size_ -= last - first; +} + +void SequentialIter::doReset(size_t pos) { + DCHECK((pos == 0 && size() == 0) || (pos < size())); + iter_ = rows_->begin() + pos; + bitIdx_ = 0; + if (bitset_.empty()) { + return; + } + if (!bitset_[0]) { + next(); } - iter_ = rows_.begin(); } const Value& SequentialIter::getColumn(int32_t index) const { diff --git a/src/context/Iterator.h b/src/context/Iterator.h index 67fea96ff..986a2c8b9 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -427,47 +427,26 @@ class SequentialIter final : public Iterator { return copy; } - bool valid() const override { - return iter_ < rows_.end(); - } + bool valid() const override; - void next() override { - if (valid()) { - ++iter_; - } - } + void next() override; - void erase() override { - iter_ = rows_.erase(iter_); - } + void erase() override; - void unstableErase() override { - iter_ = eraseBySwap(rows_, iter_); - } + void unstableErase() override; - void eraseRange(size_t first, size_t last) override { - if (first >= last || first >= size()) { - return; - } - if (last > size()) { - rows_.erase(rows_.begin() + first, rows_.end()); - } else { - rows_.erase(rows_.begin() + first, rows_.begin() + last); - } - reset(); - } + void eraseRange(size_t first, size_t last) override; void clear() override { - rows_.clear(); reset(); } - RowsIter begin() { - return rows_.begin(); + std::vector::iterator begin() { + return rows_->begin(); } - RowsIter end() { - return rows_.end(); + std::vector::iterator end() { + return rows_->end(); } const std::unordered_map& getColIndices() const { @@ -475,23 +454,21 @@ class SequentialIter final : public Iterator { } size_t size() const override { - return rows_.size(); + return size_; } const Value& getColumn(const std::string& col) const override { if (!valid()) { return Value::kNullValue; } - auto logicalRow = *iter_; + auto& row = *iter_; auto index = colIndices_.find(col); if (index == colIndices_.end()) { return Value::kNullValue; } - DCHECK_EQ(logicalRow.segments_.size(), 1); - auto* row = logicalRow.segments_[0]; - DCHECK_LT(index->second, row->values.size()); - return row->values[index->second]; + DCHECK_LT(index->second, row.values.size()); + return row.values[index->second]; } const Value& getColumn(int32_t index) const override; @@ -510,31 +487,26 @@ class SequentialIter final : public Iterator { protected: const LogicalRow* row() const override { - if (!valid()) { - return nullptr; - } - return &*iter_; + return nullptr; } // Notice: We only use this interface when return results to client. friend class DataCollectExecutor; Row&& moveRow() { - DCHECK_EQ(iter_->segments_.size(), 1); - auto* row = iter_->segments_[0]; - return std::move(*const_cast(row)); + return std::move(*iter_); } private: - void doReset(size_t pos) override { - DCHECK((pos == 0 && size() == 0) || (pos < size())); - iter_ = rows_.begin() + pos; - } + void doReset(size_t pos) override; void init(std::vector>&& iterators); - RowsType rows_; - RowsIter iter_; + std::vector::iterator iter_; + std::vector* rows_{nullptr}; + size_t size_{0}; std::unordered_map colIndices_; + boost::dynamic_bitset<> bitset_; + int64_t bitIdx_{-1}; }; class PropIter; diff --git a/src/context/test/IteratorTest.cpp b/src/context/test/IteratorTest.cpp index d0cb29845..331583ec1 100644 --- a/src/context/test/IteratorTest.cpp +++ b/src/context/test/IteratorTest.cpp @@ -68,6 +68,7 @@ TEST(IteratorTest, Sequential) { } int32_t count = 0; for (iter.reset(); iter.valid(); iter.next()) { + LOG(INFO) << iter.getColumn("col1").getInt(); EXPECT_NE(iter.getColumn("col1").getInt() % 2, 0); count++; } @@ -659,6 +660,7 @@ TEST(IteratorTest, EraseRange) { EXPECT_EQ(iter.size(), 5); auto i = 5; for (; iter.valid(); iter.next()) { + LOG(INFO) << iter.getColumn("col1").getInt(); ASSERT_EQ(iter.getColumn("col1"), i); ASSERT_EQ(iter.getColumn("col2"), folly::to(i)); ++i; @@ -944,7 +946,7 @@ TEST(IteratorTest, RowEqualTo) { EXPECT_TRUE(std::equal_to()(&row0, &row0)); } -TEST(IteratorTest, EraseBySwap) { +TEST(IteratorTest, DISABLED_EraseBySwap) { DataSet ds; ds.colNames = {"col1", "col2"}; for (auto i = 0; i < 3; ++i) { diff --git a/src/executor/query/SortExecutor.cpp b/src/executor/query/SortExecutor.cpp index 25ee7871c..f7e88c472 100644 --- a/src/executor/query/SortExecutor.cpp +++ b/src/executor/query/SortExecutor.cpp @@ -31,7 +31,7 @@ folly::Future SortExecutor::execute() { } auto &factors = sort->factors(); - auto comparator = [&factors] (const LogicalRow &lhs, const LogicalRow &rhs) { + auto comparator = [&factors] (const Row &lhs, const Row &rhs) { for (auto &item : factors) { auto index = item.first; auto orderType = item.second; @@ -52,11 +52,11 @@ folly::Future SortExecutor::execute() { auto seqIter = static_cast(iter.get()); std::sort(seqIter->begin(), seqIter->end(), comparator); } else if (iter->isJoinIter()) { - auto joinIter = static_cast(iter.get()); - std::sort(joinIter->begin(), joinIter->end(), comparator); + // auto joinIter = static_cast(iter.get()); + // std::sort(joinIter->begin(), joinIter->end(), comparator); } else if (iter->isPropIter()) { - auto propIter = static_cast(iter.get()); - std::sort(propIter->begin(), propIter->end(), comparator); + // auto propIter = static_cast(iter.get()); + // std::sort(propIter->begin(), propIter->end(), comparator); } return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); } diff --git a/src/executor/query/TopNExecutor.cpp b/src/executor/query/TopNExecutor.cpp index cf734f1ca..c7b5cc73a 100644 --- a/src/executor/query/TopNExecutor.cpp +++ b/src/executor/query/TopNExecutor.cpp @@ -30,7 +30,7 @@ folly::Future TopNExecutor::execute() { } auto &factors = topn->factors(); - comparator_ = [&factors] (const LogicalRow &lhs, const LogicalRow &rhs) { + comparator_ = [&factors] (const Row &lhs, const Row &rhs) { for (auto &item : factors) { auto index = item.first; auto orderType = item.second; @@ -66,11 +66,11 @@ folly::Future TopNExecutor::execute() { } if (iter->isSequentialIter()) { - executeTopN(iter.get()); + // executeTopN(iter.get()); } else if (iter->isJoinIter()) { - executeTopN(iter.get()); + // executeTopN(iter.get()); } else if (iter->isPropIter()) { - executeTopN(iter.get()); + // executeTopN(iter.get()); } iter->eraseRange(maxCount_, size); return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); diff --git a/src/executor/query/TopNExecutor.h b/src/executor/query/TopNExecutor.h index d20b72217..9f5c969b4 100644 --- a/src/executor/query/TopNExecutor.h +++ b/src/executor/query/TopNExecutor.h @@ -26,7 +26,7 @@ class TopNExecutor final : public Executor { int64_t offset_; int64_t maxCount_; int64_t heapSize_; - std::function comparator_; + std::function comparator_; }; } // namespace graph From 2725e094318fe87bafcb4f316b4c1a03b793d806 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Thu, 4 Mar 2021 14:22:46 +0800 Subject: [PATCH 02/11] Remove logical row. --- src/context/Iterator.h | 24 ++-- .../algo/CartesianProductExecutor.cpp | 48 +------ src/executor/query/DataCollectExecutor.cpp | 2 +- src/executor/query/DataJoinExecutor.cpp | 121 ++++++++++++++++++ src/executor/query/DataJoinExecutor.h | 37 ++++++ src/executor/query/DedupExecutor.cpp | 2 +- src/executor/query/IntersectExecutor.cpp | 2 +- src/executor/query/MinusExecutor.cpp | 2 +- 8 files changed, 176 insertions(+), 62 deletions(-) create mode 100644 src/executor/query/DataJoinExecutor.cpp create mode 100644 src/executor/query/DataJoinExecutor.h diff --git a/src/context/Iterator.h b/src/context/Iterator.h index 986a2c8b9..bd5e15642 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -87,7 +87,7 @@ class Iterator { // Warning this will break the origin order of elements! virtual void unstableErase() = 0; - virtual const LogicalRow* row() const = 0; + virtual const Row* row() const = 0; // erase range, no include last position, if last > size(), erase to the end position virtual void eraseRange(size_t first, size_t last) = 0; @@ -223,7 +223,7 @@ class DefaultIter final : public Iterator { return Value::kEmpty; } - const LogicalRow* row() const override { + const Row* row() const override { DLOG(FATAL) << "This method should not be invoked"; return nullptr; } @@ -293,7 +293,7 @@ class GetNeighborsIter final : public Iterator { // Its unique based on the GN interface dedup List getEdges(); - const LogicalRow* row() const override { + const Row* row() const override { DCHECK(false); return nullptr; } @@ -486,8 +486,8 @@ class SequentialIter final : public Iterator { } protected: - const LogicalRow* row() const override { - return nullptr; + const Row* row() const override { + return &*iter_; } // Notice: We only use this interface when return results to client. @@ -666,11 +666,8 @@ class JoinIter final : public Iterator { const Value& getColumn(int32_t index) const override; - const LogicalRow* row() const override { - if (!valid()) { - return nullptr; - } - return &*iter_; + const Row* row() const override { + return nullptr; } void reserve(size_t n) { @@ -801,11 +798,8 @@ class PropIter final : public Iterator { return rows_.size(); } - const LogicalRow* row() const override { - if (!valid()) { - return nullptr; - } - return &*iter_; + const Row* row() const override { + return nullptr; } const std::unordered_map& getColIndices() const { diff --git a/src/executor/algo/CartesianProductExecutor.cpp b/src/executor/algo/CartesianProductExecutor.cpp index 29e4f5e74..0f6bb56f5 100644 --- a/src/executor/algo/CartesianProductExecutor.cpp +++ b/src/executor/algo/CartesianProductExecutor.cpp @@ -22,58 +22,20 @@ folly::Future CartesianProductExecutor::execute() { } std::vector emptyCol; auto leftIter = std::make_unique(emptyCol); - for (size_t i = 0; i < vars.size(); ++i) { - auto rightIter = ectx_->getResult(vars[i]).iter(); - DCHECK(!!rightIter); - VLOG(1) << "Vars[" << i << "] : " << vars[i]; - if (rightIter->isDefaultIter() || rightIter->isGetNeighborsIter()) { - std::stringstream ss; - ss << "CartesianProductExecutor does not support" << rightIter->kind(); - return Status::Error(ss.str()); - } - std::vector colNames = leftIter->colNames(); - colNames.reserve(colNames.size() + colNames_[i].size()); - for (auto& name : colNames_[i]) { - colNames.emplace_back(name); - } - auto joinIter = std::make_unique(std::move(colNames)); - joinIter->joinIndex(leftIter.get(), rightIter.get()); - if (i == 0) { - initJoinIter(joinIter.get(), rightIter.get()); - } else { - doCartesianProduct(leftIter.get(), rightIter.get(), joinIter.get()); - } - - leftIter.reset(joinIter.release()); - } return finish(ResultBuilder().value(DataSet()).iter(std::move(leftIter)).finish()); } void CartesianProductExecutor::initJoinIter(JoinIter* joinIter, Iterator* rightIter) { - for (; rightIter->valid(); rightIter->next()) { - auto size = rightIter->row()->size(); - JoinIter::JoinLogicalRow newRow( - rightIter->row()->segments(), size, &joinIter->getColIdxIndices()); - joinIter->addRow(std::move(newRow)); - } + UNUSED(joinIter); + UNUSED(rightIter); } void CartesianProductExecutor::doCartesianProduct(Iterator* leftIter, Iterator* rightIter, JoinIter* joinIter) { - for (; leftIter->valid(); leftIter->next()) { - auto& lSegs = leftIter->row()->segments(); - for (; rightIter->valid(); rightIter->next()) { - std::vector values; - auto& rSegs = rightIter->row()->segments(); - values.insert(values.end(), lSegs.begin(), lSegs.end()); - values.insert(values.end(), rSegs.begin(), rSegs.end()); - auto size = leftIter->row()->size() + rightIter->row()->size(); - JoinIter::JoinLogicalRow newRow(std::move(values), size, &joinIter->getColIdxIndices()); - joinIter->addRow(std::move(newRow)); - } - rightIter->reset(); - } + UNUSED(joinIter); + UNUSED(rightIter); + UNUSED(leftIter); } } // namespace graph diff --git a/src/executor/query/DataCollectExecutor.cpp b/src/executor/query/DataCollectExecutor.cpp index ba11c22ea..45630a898 100644 --- a/src/executor/query/DataCollectExecutor.cpp +++ b/src/executor/query/DataCollectExecutor.cpp @@ -132,7 +132,7 @@ Status DataCollectExecutor::collectMToN(const std::vector& vars, DataSet ds; ds.colNames = std::move(colNames_); DCHECK(!ds.colNames.empty()); - std::unordered_set unique; + std::unordered_set unique; // itersHolder keep life cycle of iters util this method return. std::vector> itersHolder; for (auto& var : vars) { diff --git a/src/executor/query/DataJoinExecutor.cpp b/src/executor/query/DataJoinExecutor.cpp new file mode 100644 index 000000000..34864fd3a --- /dev/null +++ b/src/executor/query/DataJoinExecutor.cpp @@ -0,0 +1,121 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/query/DataJoinExecutor.h" + +#include "planner/Query.h" +#include "context/QueryExpressionContext.h" +#include "context/Iterator.h" +#include "util/ScopedTimer.h" + +namespace nebula { +namespace graph { +folly::Future DataJoinExecutor::execute() { + return doInnerJoin(); +} + +Status DataJoinExecutor::close() { + exchange_ = false; + hashTable_.clear(); + return Executor::close(); +} + +folly::Future DataJoinExecutor::doInnerJoin() { + SCOPED_TIMER(&execTime_); + + auto* dataJoin = asNode(node()); + auto lhsIter = ectx_ + ->getVersionedResult(dataJoin->leftVar().first, + dataJoin->leftVar().second) + .iter(); + DCHECK(!!lhsIter); + if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) { + std::stringstream ss; + ss << "Join executor does not support " << lhsIter->kind(); + return error(Status::Error(ss.str())); + } + auto rhsIter = ectx_ + ->getVersionedResult(dataJoin->rightVar().first, + dataJoin->rightVar().second) + .iter(); + DCHECK(!!rhsIter); + if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) { + std::stringstream ss; + ss << "Join executor does not support " << lhsIter->kind(); + return error(Status::Error(ss.str())); + } + + + DataSet result; + if (!(lhsIter->empty() || rhsIter->empty())) { + if (lhsIter->size() < rhsIter->size()) { + buildHashTable(dataJoin->hashKeys(), lhsIter.get()); + result = probe(dataJoin->probeKeys(), rhsIter.get()); + } else { + exchange_ = true; + buildHashTable(dataJoin->probeKeys(), rhsIter.get()); + result = probe(dataJoin->hashKeys(), lhsIter.get()); + } + } + result.colNames = dataJoin->colNames(); + VLOG(1) << result; + return finish(ResultBuilder().value(Value(std::move(result))).finish()); +} + +void DataJoinExecutor::buildHashTable(const std::vector& hashKeys, + Iterator* iter) { + QueryExpressionContext ctx(ectx_); + for (; iter->valid(); iter->next()) { + List list; + list.values.reserve(hashKeys.size()); + for (auto& col : hashKeys) { + Value val = col->eval(ctx(iter)); + list.values.emplace_back(std::move(val)); + } + + auto& vals = hashTable_[list]; + vals.emplace_back(iter->row()); + } +} + +DataSet DataJoinExecutor::probe(const std::vector& probeKeys, + Iterator* probeIter) { + DataSet ds; + QueryExpressionContext ctx(ectx_); + for (; probeIter->valid(); probeIter->next()) { + List list; + list.values.reserve(probeKeys.size()); + for (auto& col : probeKeys) { + Value val = col->eval(ctx(probeIter)); + list.values.emplace_back(std::move(val)); + } + + const auto& range = hashTable_.find(list); + if (range == hashTable_.end()) { + continue; + } + for (auto* row : range->second) { + auto& lRow = *row; + auto& rRow = *probeIter->row(); + VLOG(1) << lRow << rRow; + Row newRow; + newRow.reserve(lRow.size() + rRow.size()); + auto& values = newRow.values; + if (exchange_) { + values.insert(values.end(), rRow.values.begin(), rRow.values.end()); + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); + } else { + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); + values.insert(values.end(), rRow.values.begin(), rRow.values.end()); + } + VLOG(1) << "Row: " << newRow; + ds.rows.emplace_back(std::move(newRow)); + } + } + return ds; +} +} // namespace graph +} // namespace nebula diff --git a/src/executor/query/DataJoinExecutor.h b/src/executor/query/DataJoinExecutor.h new file mode 100644 index 000000000..fca3f80fc --- /dev/null +++ b/src/executor/query/DataJoinExecutor.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXECUTOR_QUERY_DATAJOINEXECUTOR_H_ +#define EXECUTOR_QUERY_DATAJOINEXECUTOR_H_ + +#include "executor/Executor.h" + +namespace nebula { +namespace graph { + +class DataJoinExecutor final : public Executor { +public: + DataJoinExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("DataJoinExecutor", node, qctx) {} + + folly::Future execute() override; + + Status close() override; + +private: + folly::Future doInnerJoin(); + + void buildHashTable(const std::vector& hashKeys, Iterator* iter); + + DataSet probe(const std::vector& probeKeys, Iterator* probeiter); + +private: + bool exchange_{false}; + std::unordered_map> hashTable_; +}; +} // namespace graph +} // namespace nebula +#endif diff --git a/src/executor/query/DedupExecutor.cpp b/src/executor/query/DedupExecutor.cpp index 085e08fab..64b3067b4 100644 --- a/src/executor/query/DedupExecutor.cpp +++ b/src/executor/query/DedupExecutor.cpp @@ -28,7 +28,7 @@ folly::Future DedupExecutor::execute() { } ResultBuilder builder; builder.value(iter->valuePtr()); - std::unordered_set unique; + std::unordered_set unique; unique.reserve(iter->size()); while (iter->valid()) { if (!unique.emplace(iter->row()).second) { diff --git a/src/executor/query/IntersectExecutor.cpp b/src/executor/query/IntersectExecutor.cpp index c03971704..a661f4049 100644 --- a/src/executor/query/IntersectExecutor.cpp +++ b/src/executor/query/IntersectExecutor.cpp @@ -23,7 +23,7 @@ folly::Future IntersectExecutor::execute() { auto lIter = getLeftInputDataIter(); auto rIter = getRightInputDataIter(); - std::unordered_set hashSet; + std::unordered_set hashSet; for (; rIter->valid(); rIter->next()) { hashSet.insert(rIter->row()); // TODO: should test duplicate rows diff --git a/src/executor/query/MinusExecutor.cpp b/src/executor/query/MinusExecutor.cpp index 0bde67b6e..998d91165 100644 --- a/src/executor/query/MinusExecutor.cpp +++ b/src/executor/query/MinusExecutor.cpp @@ -22,7 +22,7 @@ folly::Future MinusExecutor::execute() { auto lIter = getLeftInputDataIter(); auto rIter = getRightInputDataIter(); - std::unordered_set hashSet; + std::unordered_set hashSet; for (; rIter->valid(); rIter->next()) { hashSet.insert(rIter->row()); // TODO: should test duplicate rows From 24fe87e0f2d155aea01946c80b0444ddfef8ce9b Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Thu, 4 Mar 2021 16:13:13 +0800 Subject: [PATCH 03/11] Remove join iter. --- src/context/Iterator.cpp | 94 -------------------- src/context/Iterator.h | 187 --------------------------------------- 2 files changed, 281 deletions(-) diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp index 1dd144301..81ff79498 100644 --- a/src/context/Iterator.cpp +++ b/src/context/Iterator.cpp @@ -547,100 +547,6 @@ const Value& SequentialIter::getColumn(int32_t index) const { return getColumnByIndex(index, iter_); } -void JoinIter::joinIndex(const Iterator* lhs, const Iterator* rhs) { - size_t nextSeg = 0; - if (lhs != nullptr) { - switch (lhs->kind()) { - case Iterator::Kind::kSequential: { - nextSeg = buildIndexFromSeqIter(static_cast(lhs), 0); - break; - } - case Iterator::Kind::kJoin: { - nextSeg = buildIndexFromJoinIter(static_cast(lhs), 0); - break; - } - case Iterator::Kind::kProp: { - nextSeg = buildIndexFromPropIter(static_cast(lhs), 0); - break; - } - case Iterator::Kind::kDefault: - case Iterator::Kind::kGetNeighbors: { - LOG(FATAL) << "Join Not Support " << lhs->kind(); - break; - } - } - } - if (rhs == nullptr) { - return; - } - switch (rhs->kind()) { - case Iterator::Kind::kSequential: { - buildIndexFromSeqIter(static_cast(rhs), nextSeg); - break; - } - case Iterator::Kind::kJoin: { - buildIndexFromJoinIter(static_cast(rhs), nextSeg); - break; - } - case Iterator::Kind::kProp: { - buildIndexFromPropIter(static_cast(rhs), nextSeg); - break; - } - case Iterator::Kind::kDefault: - case Iterator::Kind::kGetNeighbors: { - LOG(FATAL) << "Join Not Support " << lhs->kind(); - break; - } - } -} - -size_t JoinIter::buildIndexFromPropIter(const PropIter* iter, size_t segIdx) { - auto colIdxStart = colIdxIndices_.size(); - for (auto& col : iter->getColIndices()) { - DCHECK_LT(col.second + colIdxStart, colNames_.size()); - auto& colName = colNames_[col.second + colIdxStart]; - colIndices_.emplace(colName, std::make_pair(segIdx, col.second)); - colIdxIndices_.emplace(col.second + colIdxStart, std::make_pair(segIdx, col.second)); - } - return segIdx + 1; -} - -size_t JoinIter::buildIndexFromSeqIter(const SequentialIter* iter, size_t segIdx) { - auto colIdxStart = colIdxIndices_.size(); - for (auto& col : iter->getColIndices()) { - DCHECK_LT(col.second + colIdxStart, colNames_.size()); - auto& colName = colNames_[col.second + colIdxStart]; - colIndices_.emplace(colName, std::make_pair(segIdx, col.second)); - colIdxIndices_.emplace(col.second + colIdxStart, std::make_pair(segIdx, col.second)); - } - return segIdx + 1; -} - -size_t JoinIter::buildIndexFromJoinIter(const JoinIter* iter, size_t segIdx) { - auto colIdxStart = colIdxIndices_.size(); - size_t nextSeg = 0; - if (iter->getColIndices().empty()) { - return nextSeg; - } - - for (auto& col : iter->getColIdxIndices()) { - auto oldSeg = col.second.first; - size_t newSeg = oldSeg + segIdx; - if (newSeg > nextSeg) { - nextSeg = newSeg; - } - DCHECK_LT(col.first + colIdxStart, colNames_.size()); - auto& colName = colNames_[col.first + colIdxStart]; - colIndices_.emplace(colName, std::make_pair(newSeg, col.second.second)); - colIdxIndices_.emplace(col.first + colIdxStart, std::make_pair(newSeg, col.second.second)); - } - return nextSeg + 1; -} - -const Value& JoinIter::getColumn(int32_t index) const { - return getColumnByIndex(index, iter_); -} - PropIter::PropIter(std::shared_ptr value) : Iterator(value, Kind::kProp) { DCHECK(value->isDataSet()); auto& ds = value->getDataSet(); diff --git a/src/context/Iterator.h b/src/context/Iterator.h index bd5e15642..16146befa 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -509,193 +509,6 @@ class SequentialIter final : public Iterator { int64_t bitIdx_{-1}; }; -class PropIter; -class JoinIter final : public Iterator { -public: - class JoinLogicalRow final : public LogicalRow { - public: - explicit JoinLogicalRow( - std::vector segments, - size_t size, - const std::unordered_map>* colIdxIndices) - : LogicalRow(std::move(segments)), size_(size), colIdxIndices_(colIdxIndices) {} - - JoinLogicalRow(const JoinLogicalRow &r) = default; - JoinLogicalRow& operator=(const JoinLogicalRow &r) = default; - - JoinLogicalRow(JoinLogicalRow &&r) noexcept { - *this = std::move(r); - } - - JoinLogicalRow& operator=(JoinLogicalRow &&r) noexcept { - segments_ = std::move(r.segments_); - - size_ = r.size_; - r.size_ = 0; - - colIdxIndices_ = r.colIdxIndices_; - r.colIdxIndices_ = nullptr; - return *this; - } - - const Value& operator[](size_t idx) const override { - if (idx < size_) { - auto index = colIdxIndices_->find(idx); - if (index == colIdxIndices_->end()) { - return Value::kNullValue; - } - auto keyIdx = index->second.first; - auto valIdx = index->second.second; - DCHECK_LT(keyIdx, segments_.size()); - DCHECK_LT(valIdx, segments_[keyIdx]->values.size()); - return segments_[keyIdx]->values[valIdx]; - } - return Value::kEmpty; - } - - size_t size() const override { - return size_; - } - - LogicalRow::Kind kind() const override { - return Kind::kJoin; - } - - const std::vector& segments() const override { - return segments_; - } - - private: - friend class JoinIter; - size_t size_; - const std::unordered_map>* colIdxIndices_; - }; - - explicit JoinIter(std::vector colNames) - : Iterator(nullptr, Kind::kJoin), colNames_(std::move(colNames)) {} - - void joinIndex(const Iterator* lhs, const Iterator* rhs); - - void addRow(JoinLogicalRow row) { - rows_.emplace_back(std::move(row)); - iter_ = rows_.begin(); - } - - std::unique_ptr copy() const override { - auto copy = std::make_unique(*this); - copy->reset(); - return copy; - } - - std::vector colNames() const { - return colNames_; - } - - bool valid() const override { - return iter_ < rows_.end(); - } - - void next() override { - if (valid()) { - ++iter_; - } - } - - void erase() override { - iter_ = rows_.erase(iter_); - } - - void unstableErase() override { - iter_ = eraseBySwap(rows_, iter_); - } - - void eraseRange(size_t first, size_t last) override { - if (first >= last || first >= size()) { - return; - } - if (last > size()) { - rows_.erase(rows_.begin() + first, rows_.end()); - } else { - rows_.erase(rows_.begin() + first, rows_.begin() + last); - } - reset(); - } - - void clear() override { - rows_.clear(); - reset(); - } - - RowsIter begin() { - return rows_.begin(); - } - - RowsIter end() { - return rows_.end(); - } - - const std::unordered_map>& - getColIndices() const { - return colIndices_; - } - - const std::unordered_map>& - getColIdxIndices() const { - return colIdxIndices_; - } - - size_t size() const override { - return rows_.size(); - } - - const Value& getColumn(const std::string& col) const override { - if (!valid()) { - return Value::kNullValue; - } - auto row = *iter_; - auto index = colIndices_.find(col); - if (index == colIndices_.end()) { - return Value::kNullValue; - } - auto segIdx = index->second.first; - auto colIdx = index->second.second; - DCHECK_LT(segIdx, row.segments_.size()); - DCHECK_LT(colIdx, row.segments_[segIdx]->values.size()); - return row.segments_[segIdx]->values[colIdx]; - } - - const Value& getColumn(int32_t index) const override; - - const Row* row() const override { - return nullptr; - } - - void reserve(size_t n) { - rows_.reserve(n); - } - -private: - void doReset(size_t pos) override { - DCHECK((pos == 0 && size() == 0) || (pos < size())); - iter_ = rows_.begin() + pos; - } - - size_t buildIndexFromSeqIter(const SequentialIter* iter, size_t segIdx); - - size_t buildIndexFromJoinIter(const JoinIter* iter, size_t segIdx); - - size_t buildIndexFromPropIter(const PropIter* iter, size_t segIdx); - -private: - std::vector colNames_; - RowsType rows_; - RowsIter iter_; - // colName -> segIdx, currentSegColIdx - std::unordered_map> colIndices_; - // colIdx -> segIdx, currentSegColIdx - std::unordered_map> colIdxIndices_; -}; - class PropIter final : public Iterator { public: class PropLogicalRow final : public LogicalRow { From 6700512cd187871e37609e193e8d44fbc64c780c Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Thu, 4 Mar 2021 19:51:09 +0800 Subject: [PATCH 04/11] Erase data directly. --- src/context/Iterator.cpp | 51 +++--- src/context/Iterator.h | 6 +- src/context/test/IteratorTest.cpp | 145 ------------------ .../algo/CartesianProductExecutor.cpp | 12 +- src/executor/algo/CartesianProductExecutor.h | 4 +- src/executor/query/TopNExecutor.cpp | 6 +- src/executor/query/TopNExecutor.h | 2 +- 7 files changed, 33 insertions(+), 193 deletions(-) diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp index 81ff79498..e709811db 100644 --- a/src/context/Iterator.cpp +++ b/src/context/Iterator.cpp @@ -463,11 +463,6 @@ SequentialIter::SequentialIter(std::shared_ptr value) : Iterator(value, K auto& ds = value->mutableDataSet(); iter_ = ds.rows.begin(); rows_ = &ds.rows; - bitIdx_ = 0; - if (bitset_.empty()) { - bitset_.push_back(true); - size_ = rows_->size(); - } for (size_t i = 0; i < ds.colNames.size(); ++i) { colIndices_.emplace(ds.colNames[i], i); } @@ -487,7 +482,21 @@ SequentialIter::SequentialIter(std::vector> inputList) } void SequentialIter::init(std::vector>&& iterators) { - UNUSED(iterators); + DCHECK(!iterators.empty()); + const auto& firstIter = iterators.front(); + DCHECK(firstIter->isSequentialIter()); + colIndices_ = static_cast(firstIter.get())->getColIndices(); + DataSet ds; + for (auto& iter : iterators) { + DCHECK(iter->isSequentialIter()); + auto inputIter = static_cast(iter.get()); + ds.rows.insert(ds.rows.end(), + std::make_move_iterator(inputIter->begin()), + std::make_move_iterator(inputIter->end())); + } + value_ = std::make_shared(std::move(ds)); + rows_ = &value_->mutableDataSet().rows; + iter_ = rows_->begin(); } bool SequentialIter::valid() const { @@ -495,22 +504,13 @@ bool SequentialIter::valid() const { } void SequentialIter::next() { - while (valid()) { + if (valid()) { ++iter_; - ++bitIdx_; - if (static_cast(bitIdx_) >= bitset_.size()) { - bitset_.push_back(true); - } else if (!bitset_[bitIdx_]) { - continue; - } - break; } } void SequentialIter::erase() { - bitset_[bitIdx_] = false; - next(); - --size_; + iter_ = rows_->erase(iter_); } void SequentialIter::unstableErase() { @@ -521,26 +521,17 @@ void SequentialIter::eraseRange(size_t first, size_t last) { if (first >= last || first >= size()) { return; } - if (last >= size()) { - last = size(); - } - for (size_t i = first; i < last; ++i) { - bitset_[i] = false; + if (last > size()) { + rows_->erase(rows_->begin() + first, rows_->end()); + } else { + rows_->erase(rows_->begin() + first, rows_->begin() + last); } reset(); - size_ -= last - first; } void SequentialIter::doReset(size_t pos) { DCHECK((pos == 0 && size() == 0) || (pos < size())); iter_ = rows_->begin() + pos; - bitIdx_ = 0; - if (bitset_.empty()) { - return; - } - if (!bitset_[0]) { - next(); - } } const Value& SequentialIter::getColumn(int32_t index) const { diff --git a/src/context/Iterator.h b/src/context/Iterator.h index 16146befa..197796356 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -438,6 +438,7 @@ class SequentialIter final : public Iterator { void eraseRange(size_t first, size_t last) override; void clear() override { + rows_->clear(); reset(); } @@ -454,7 +455,7 @@ class SequentialIter final : public Iterator { } size_t size() const override { - return size_; + return rows_->size(); } const Value& getColumn(const std::string& col) const override { @@ -503,10 +504,7 @@ class SequentialIter final : public Iterator { std::vector::iterator iter_; std::vector* rows_{nullptr}; - size_t size_{0}; std::unordered_map colIndices_; - boost::dynamic_bitset<> bitset_; - int64_t bitIdx_{-1}; }; class PropIter final : public Iterator { diff --git a/src/context/test/IteratorTest.cpp b/src/context/test/IteratorTest.cpp index 331583ec1..f2011753a 100644 --- a/src/context/test/IteratorTest.cpp +++ b/src/context/test/IteratorTest.cpp @@ -68,7 +68,6 @@ TEST(IteratorTest, Sequential) { } int32_t count = 0; for (iter.reset(); iter.valid(); iter.next()) { - LOG(INFO) << iter.getColumn("col1").getInt(); EXPECT_NE(iter.getColumn("col1").getInt() % 2, 0); count++; } @@ -660,7 +659,6 @@ TEST(IteratorTest, EraseRange) { EXPECT_EQ(iter.size(), 5); auto i = 5; for (; iter.valid(); iter.next()) { - LOG(INFO) << iter.getColumn("col1").getInt(); ASSERT_EQ(iter.getColumn("col1"), i); ASSERT_EQ(iter.getColumn("col2"), folly::to(i)); ++i; @@ -669,149 +667,6 @@ TEST(IteratorTest, EraseRange) { } } -TEST(IteratorTest, Join) { - DataSet ds1; - ds1.colNames = {kVid, "tag_prop", "edge_prop", kDst}; - auto val1 = std::make_shared(ds1); - SequentialIter iter1(val1); - - DataSet ds2; - ds2.colNames = {"src", "dst"}; - auto val2 = std::make_shared(ds2); - SequentialIter iter2(val2); - - Row row1; - row1.values = {"1", 1, 2, "2"}; - Row row2; - row2.values = {"3", "4"}; - JoinIter joinIter({kVid, "tag_prop", "edge_prop", kDst, "src", "dst"}); - joinIter.joinIndex(&iter1, &iter2); - EXPECT_EQ(joinIter.getColIdxIndices().size(), 6); - EXPECT_EQ(joinIter.getColIdxIndices().size(), 6); - joinIter.addRow(JoinIter::JoinLogicalRow({ &row1, &row2 }, 6, &joinIter.getColIdxIndices())); - joinIter.addRow(JoinIter::JoinLogicalRow({ &row1, &row2 }, 6, &joinIter.getColIdxIndices())); - - for (; joinIter.valid(); joinIter.next()) { - const auto& row = *joinIter.row(); - EXPECT_EQ(row.size(), 6); - std::vector result; - for (size_t i = 0; i < 6; ++i) { - result.emplace_back(row[i]); - } - EXPECT_EQ(result, std::vector({"1", 1, 2, "2", "3", "4"})); - } - - for (joinIter.reset(); joinIter.valid(); joinIter.next()) { - const auto& row = *joinIter.row(); - EXPECT_EQ(row.size(), 6); - std::vector result; - result.emplace_back(joinIter.getColumn(kVid)); - result.emplace_back(joinIter.getColumn("tag_prop")); - result.emplace_back(joinIter.getColumn("edge_prop")); - result.emplace_back(joinIter.getColumn(kDst)); - result.emplace_back(joinIter.getColumn("src")); - result.emplace_back(joinIter.getColumn("dst")); - EXPECT_EQ(result, std::vector({"1", 1, 2, "2", "3", "4"})); - } - - { - // The iterator and executors will not handle the duplicate columns, - // so the duplicate column will be covered by later one. - JoinIter joinIter1({"src", "dst", kVid, "tag_prop", "edge_prop", kDst, "src", "dst"}); - joinIter1.joinIndex(&iter2, &joinIter); - EXPECT_EQ(joinIter.getColIndices().size(), 6); - } - - { - DataSet ds3; - ds3.colNames = {"tag_prop1", "edge_prop1"}; - auto val3 = std::make_shared(ds3); - SequentialIter iter3(val3); - - Row row3; - row3.values = {"5", "6"}; - - JoinIter joinIter2( - {"tag_prop1", "edge_prop1", kVid, "tag_prop", "edge_prop", kDst, "src", "dst"}); - joinIter2.joinIndex(&iter3, &joinIter); - EXPECT_EQ(joinIter2.getColIndices().size(), 8); - EXPECT_EQ(joinIter2.getColIdxIndices().size(), 8); - joinIter2.addRow(JoinIter::JoinLogicalRow({ &row3, &row1, &row2}, 8, - &joinIter2.getColIdxIndices())); - joinIter2.addRow(JoinIter::JoinLogicalRow({ &row3, &row1, &row2}, 8, - &joinIter2.getColIdxIndices())); - - for (; joinIter2.valid(); joinIter2.next()) { - const auto& row = *joinIter2.row(); - EXPECT_EQ(row.size(), 8); - std::vector result; - for (size_t i = 0; i < 8; ++i) { - result.emplace_back(row[i]); - } - EXPECT_EQ(result, std::vector({"5", "6", "1", 1, 2, "2", "3", "4"})); - } - - for (joinIter2.reset(); joinIter2.valid(); joinIter2.next()) { - const auto& row = *joinIter2.row(); - EXPECT_EQ(row.size(), 8); - std::vector result; - result.emplace_back(joinIter2.getColumn(kVid)); - result.emplace_back(joinIter2.getColumn("tag_prop")); - result.emplace_back(joinIter2.getColumn("edge_prop")); - result.emplace_back(joinIter2.getColumn(kDst)); - result.emplace_back(joinIter2.getColumn("src")); - result.emplace_back(joinIter2.getColumn("dst")); - result.emplace_back(joinIter2.getColumn("tag_prop1")); - result.emplace_back(joinIter2.getColumn("edge_prop1")); - EXPECT_EQ(result, std::vector({"1", 1, 2, "2", "3", "4", "5", "6"})); - } - } - { - DataSet ds3; - ds3.colNames = {"tag_prop1", "edge_prop1"}; - auto val3 = std::make_shared(ds3); - SequentialIter iter3(val3); - - Row row3; - row3.values = {"5", "6"}; - - JoinIter joinIter2( - {kVid, "tag_prop", "edge_prop", kDst, "src", "dst", "tag_prop1", "edge_prop1"}); - joinIter2.joinIndex(&joinIter, &iter3); - EXPECT_EQ(joinIter2.getColIndices().size(), 8); - EXPECT_EQ(joinIter2.getColIdxIndices().size(), 8); - joinIter2.addRow(JoinIter::JoinLogicalRow({ &row1, &row2, &row3 }, 8, - &joinIter2.getColIdxIndices())); - joinIter2.addRow(JoinIter::JoinLogicalRow({ &row1, &row2, &row3 }, 8, - &joinIter2.getColIdxIndices())); - - for (; joinIter2.valid(); joinIter2.next()) { - const auto& row = *joinIter2.row(); - EXPECT_EQ(row.size(), 8); - std::vector result; - for (size_t i = 0; i < 8; ++i) { - result.emplace_back(row[i]); - } - EXPECT_EQ(result, std::vector({"1", 1, 2, "2", "3", "4", "5", "6"})); - } - - for (joinIter2.reset(); joinIter2.valid(); joinIter2.next()) { - const auto& row = *joinIter2.row(); - EXPECT_EQ(row.size(), 8); - std::vector result; - result.emplace_back(joinIter2.getColumn(kVid)); - result.emplace_back(joinIter2.getColumn("tag_prop")); - result.emplace_back(joinIter2.getColumn("edge_prop")); - result.emplace_back(joinIter2.getColumn(kDst)); - result.emplace_back(joinIter2.getColumn("src")); - result.emplace_back(joinIter2.getColumn("dst")); - result.emplace_back(joinIter2.getColumn("tag_prop1")); - result.emplace_back(joinIter2.getColumn("edge_prop1")); - EXPECT_EQ(result, std::vector({"1", 1, 2, "2", "3", "4", "5", "6"})); - } - } -} - TEST(IteratorTest, VertexProp) { DataSet ds; ds.colNames = {kVid, "tag1.prop1", "tag2.prop1", "tag2.prop2", "tag3.prop1", "tag3.prop2"}; diff --git a/src/executor/algo/CartesianProductExecutor.cpp b/src/executor/algo/CartesianProductExecutor.cpp index 0f6bb56f5..3e874c529 100644 --- a/src/executor/algo/CartesianProductExecutor.cpp +++ b/src/executor/algo/CartesianProductExecutor.cpp @@ -21,21 +21,17 @@ folly::Future CartesianProductExecutor::execute() { return Status::Error("vars's size : %zu, must be greater than 2", vars.size()); } std::vector emptyCol; - auto leftIter = std::make_unique(emptyCol); - return finish(ResultBuilder().value(DataSet()).iter(std::move(leftIter)).finish()); + return finish(ResultBuilder().value(DataSet()).finish()); } -void CartesianProductExecutor::initJoinIter(JoinIter* joinIter, Iterator* rightIter) { - UNUSED(joinIter); +void CartesianProductExecutor::initJoinIter(Iterator* rightIter) { UNUSED(rightIter); } void CartesianProductExecutor::doCartesianProduct(Iterator* leftIter, - Iterator* rightIter, - JoinIter* joinIter) { - UNUSED(joinIter); - UNUSED(rightIter); + Iterator* rightIter) { UNUSED(leftIter); + UNUSED(rightIter); } } // namespace graph diff --git a/src/executor/algo/CartesianProductExecutor.h b/src/executor/algo/CartesianProductExecutor.h index fdb156d6a..6ba043361 100644 --- a/src/executor/algo/CartesianProductExecutor.h +++ b/src/executor/algo/CartesianProductExecutor.h @@ -19,9 +19,9 @@ class CartesianProductExecutor : public Executor { folly::Future execute() override; private: - void initJoinIter(JoinIter* joinIter, Iterator* rightIter); + void initJoinIter(Iterator* rightIter); - void doCartesianProduct(Iterator* leftIter, Iterator* rightIter, JoinIter* joinIter); + void doCartesianProduct(Iterator* leftIter, Iterator* rightIter); std::vector> colNames_; }; diff --git a/src/executor/query/TopNExecutor.cpp b/src/executor/query/TopNExecutor.cpp index c7b5cc73a..2d2dbcd8c 100644 --- a/src/executor/query/TopNExecutor.cpp +++ b/src/executor/query/TopNExecutor.cpp @@ -66,7 +66,7 @@ folly::Future TopNExecutor::execute() { } if (iter->isSequentialIter()) { - // executeTopN(iter.get()); + executeTopN(iter.get()); } else if (iter->isJoinIter()) { // executeTopN(iter.get()); } else if (iter->isPropIter()) { @@ -76,10 +76,10 @@ folly::Future TopNExecutor::execute() { return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); } -template +template void TopNExecutor::executeTopN(Iterator *iter) { auto uIter = static_cast(iter); - std::vector heap(uIter->begin(), uIter->begin()+heapSize_); + std::vector heap(uIter->begin(), uIter->begin()+heapSize_); std::make_heap(heap.begin(), heap.end(), comparator_); auto it = uIter->begin() + heapSize_; while (it != uIter->end()) { diff --git a/src/executor/query/TopNExecutor.h b/src/executor/query/TopNExecutor.h index 9f5c969b4..d72d144a7 100644 --- a/src/executor/query/TopNExecutor.h +++ b/src/executor/query/TopNExecutor.h @@ -20,7 +20,7 @@ class TopNExecutor final : public Executor { folly::Future execute() override; private: - template + template void executeTopN(Iterator *iter); int64_t offset_; From 3ec4008330fff99501d67bda45fada8048282518 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Fri, 5 Mar 2021 11:23:38 +0800 Subject: [PATCH 05/11] Fix union. --- src/executor/query/SetExecutor.cpp | 1 + src/executor/query/SetExecutor.h | 2 ++ src/executor/query/UnionExecutor.cpp | 21 ++++++++++++++++++--- src/executor/test/SetExecutorTest.cpp | 11 +---------- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/executor/query/SetExecutor.cpp b/src/executor/query/SetExecutor.cpp index 6e6539395..f01050de3 100644 --- a/src/executor/query/SetExecutor.cpp +++ b/src/executor/query/SetExecutor.cpp @@ -47,6 +47,7 @@ Status SetExecutor::checkInputDataSets() { auto& rds = rightData->getDataSet(); if (LIKELY(lds.colNames == rds.colNames)) { + colNames_ = lds.colNames; return Status::OK(); } diff --git a/src/executor/query/SetExecutor.h b/src/executor/query/SetExecutor.h index 1b7b2bff5..c2a35cb1d 100644 --- a/src/executor/query/SetExecutor.h +++ b/src/executor/query/SetExecutor.h @@ -26,6 +26,8 @@ class SetExecutor : public Executor { protected: SetExecutor(const std::string &name, const PlanNode *node, QueryContext *qctx) : Executor(name, node, qctx) {} + + std::vector colNames_; }; } // namespace graph diff --git a/src/executor/query/UnionExecutor.cpp b/src/executor/query/UnionExecutor.cpp index 9bb710d4d..16d7d326a 100644 --- a/src/executor/query/UnionExecutor.cpp +++ b/src/executor/query/UnionExecutor.cpp @@ -8,6 +8,7 @@ #include "context/ExecutionContext.h" #include "util/ScopedTimer.h" +#include "planner/Query.h" namespace nebula { namespace graph { @@ -18,9 +19,23 @@ folly::Future UnionExecutor::execute() { NG_RETURN_IF_ERROR(checkInputDataSets()); auto left = getLeftInputDataIter(); auto right = getRightInputDataIter(); - auto value = left->valuePtr(); - auto iter = std::make_unique(std::move(left), std::move(right)); - return finish(ResultBuilder().value(value).iter(std::move(iter)).finish()); + + DataSet ds; + ds.colNames = std::move(colNames_); + + DCHECK(left->isSequentialIter()); + auto leftIter = static_cast(left.get()); + ds.rows.insert(ds.rows.end(), + std::make_move_iterator(leftIter->begin()), + std::make_move_iterator(leftIter->end())); + + DCHECK(right->isSequentialIter()); + auto rightIter = static_cast(right.get()); + ds.rows.insert(ds.rows.end(), + std::make_move_iterator(rightIter->begin()), + std::make_move_iterator(rightIter->end())); + + return finish(ResultBuilder().value(Value(std::move(ds))).finish()); } } // namespace graph diff --git a/src/executor/test/SetExecutorTest.cpp b/src/executor/test/SetExecutorTest.cpp index d1c0695c4..543c2c621 100644 --- a/src/executor/test/SetExecutorTest.cpp +++ b/src/executor/test/SetExecutorTest.cpp @@ -71,16 +71,7 @@ TEST_F(SetExecutorTest, TestUnionAll) { auto& result = qctx_->ectx()->getResult(unionNode->outputVar()); EXPECT_TRUE(result.value().isDataSet()); - DataSet resultDS; - resultDS.colNames = result.value().getDataSet().colNames; - for (auto iter = result.iter(); iter->valid(); iter->next()) { - Row row; - for (auto& col : resultDS.colNames) { - row.values.emplace_back(iter->getColumn(col)); - } - resultDS.emplace_back(std::move(row)); - } - + DataSet resultDS = result.value().getDataSet(); EXPECT_TRUE(diffDataSet(resultDS, expected)) << "\nResult dataset: \n" << resultDS << "Expected dataset: \n" << expected; From 456ca7538868fc25529f0146385e4f0e22199ab5 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Fri, 5 Mar 2021 15:17:48 +0800 Subject: [PATCH 06/11] Refactor cartesian product. --- .../algo/CartesianProductExecutor.cpp | 40 ++++++++++++++----- src/executor/algo/CartesianProductExecutor.h | 4 +- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/executor/algo/CartesianProductExecutor.cpp b/src/executor/algo/CartesianProductExecutor.cpp index 3e874c529..2da1300ad 100644 --- a/src/executor/algo/CartesianProductExecutor.cpp +++ b/src/executor/algo/CartesianProductExecutor.cpp @@ -15,23 +15,41 @@ folly::Future CartesianProductExecutor::execute() { SCOPED_TIMER(&execTime_); auto* cartesianProduct = asNode(node()); - colNames_ = cartesianProduct->allColNames(); auto vars = cartesianProduct->inputVars(); if (vars.size() < 2) { return Status::Error("vars's size : %zu, must be greater than 2", vars.size()); } - std::vector emptyCol; - return finish(ResultBuilder().value(DataSet()).finish()); -} - -void CartesianProductExecutor::initJoinIter(Iterator* rightIter) { - UNUSED(rightIter); + DataSet result; + auto& lds = const_cast(ectx_->getResult(vars[0]).value().getDataSet()); + for (size_t i = 1; i < vars.size(); ++i) { + const auto& rds = ectx_->getResult(vars[i]).value().getDataSet(); + DataSet ds; + doCartesianProduct(lds, rds, ds); + result = std::move(ds); + lds = result; + } + for (auto& cols : cartesianProduct->allColNames()) { + result.colNames.reserve(result.colNames.size() + cols.size()); + result.colNames.insert(result.colNames.end(), + std::make_move_iterator(cols.begin()), + std::make_move_iterator(cols.end())); + } + return finish(ResultBuilder().value(Value(std::move(result))).finish()); } -void CartesianProductExecutor::doCartesianProduct(Iterator* leftIter, - Iterator* rightIter) { - UNUSED(leftIter); - UNUSED(rightIter); +void CartesianProductExecutor::doCartesianProduct(const DataSet& lds, + const DataSet& rds, + DataSet& result) { + result.rows.reserve(lds.size() * rds.size()); + for (auto i = lds.begin(); i < lds.end(); ++i) { + for (auto j = rds.begin(); j < rds.end(); ++j) { + Row row; + row.reserve(i->size() + j->size()); + row.values.insert(row.values.end(), i->values.begin(), i->values.end()); + row.values.insert(row.values.end(), j->values.begin(), j->values.end()); + result.rows.emplace_back(std::move(row)); + } + } } } // namespace graph diff --git a/src/executor/algo/CartesianProductExecutor.h b/src/executor/algo/CartesianProductExecutor.h index 6ba043361..d1b069bff 100644 --- a/src/executor/algo/CartesianProductExecutor.h +++ b/src/executor/algo/CartesianProductExecutor.h @@ -19,9 +19,7 @@ class CartesianProductExecutor : public Executor { folly::Future execute() override; private: - void initJoinIter(Iterator* rightIter); - - void doCartesianProduct(Iterator* leftIter, Iterator* rightIter); + void doCartesianProduct(const DataSet& lds, const DataSet& rds, DataSet& result); std::vector> colNames_; }; From b7238dd171ae2b52f6f50069a939b19ac5091765 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Fri, 5 Mar 2021 17:41:32 +0800 Subject: [PATCH 07/11] Fix cartesian product and shortest path. --- src/executor/algo/CartesianProductExecutor.cpp | 6 +++--- src/executor/algo/ConjunctPathExecutor.cpp | 10 ---------- src/validator/FindPathValidator.cpp | 2 +- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/executor/algo/CartesianProductExecutor.cpp b/src/executor/algo/CartesianProductExecutor.cpp index 2da1300ad..a5ecd9361 100644 --- a/src/executor/algo/CartesianProductExecutor.cpp +++ b/src/executor/algo/CartesianProductExecutor.cpp @@ -20,13 +20,13 @@ folly::Future CartesianProductExecutor::execute() { return Status::Error("vars's size : %zu, must be greater than 2", vars.size()); } DataSet result; - auto& lds = const_cast(ectx_->getResult(vars[0]).value().getDataSet()); + auto* lds = const_cast(&ectx_->getResult(vars[0]).value().getDataSet()); for (size_t i = 1; i < vars.size(); ++i) { const auto& rds = ectx_->getResult(vars[i]).value().getDataSet(); DataSet ds; - doCartesianProduct(lds, rds, ds); + doCartesianProduct(*lds, rds, ds); result = std::move(ds); - lds = result; + lds = &result; } for (auto& cols : cartesianProduct->allColNames()) { result.colNames.reserve(result.colNames.size() + cols.size()); diff --git a/src/executor/algo/ConjunctPathExecutor.cpp b/src/executor/algo/ConjunctPathExecutor.cpp index 1ece1bf4c..c12879fdf 100644 --- a/src/executor/algo/ConjunctPathExecutor.cpp +++ b/src/executor/algo/ConjunctPathExecutor.cpp @@ -295,16 +295,6 @@ void ConjunctPathExecutor::delPathFromConditionalVar(const Value& start, const V iter->next(); } } - - DataSet ds; - if (iter->size() == 0) { - Row row; - row.values.emplace_back("all path are found"); - ds.rows.emplace_back(std::move(row)); - } - qctx_->ectx()->setResult( - conditionalVar_, - ResultBuilder().value(Value(std::move(ds))).iter(std::move(iter)).finish()); } folly::Future ConjunctPathExecutor::allPaths() { diff --git a/src/validator/FindPathValidator.cpp b/src/validator/FindPathValidator.cpp index 03b9453bf..e7e9cc5c1 100644 --- a/src/validator/FindPathValidator.cpp +++ b/src/validator/FindPathValidator.cpp @@ -456,7 +456,7 @@ Expression* FindPathValidator::buildMultiPairLoopCondition(uint32_t steps, auto* args = new ArgumentList(); args->addArgument(std::make_unique(new std::string(conditionalVar))); auto* notAllPathFind = - new RelationalExpression(Expression::Kind::kRelEQ, + new RelationalExpression(Expression::Kind::kRelGT, new FunctionCallExpression(new std::string("size"), args), new ConstantExpression(0)); From 4ee859d1526158c50e3b0fffe4ff0f3b2e997b3d Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Mon, 8 Mar 2021 11:55:36 +0800 Subject: [PATCH 08/11] Fix unstable erase. --- src/context/Iterator.cpp | 3 ++- src/context/Iterator.h | 42 ---------------------------------------- 2 files changed, 2 insertions(+), 43 deletions(-) diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp index e709811db..d34541f78 100644 --- a/src/context/Iterator.cpp +++ b/src/context/Iterator.cpp @@ -514,7 +514,8 @@ void SequentialIter::erase() { } void SequentialIter::unstableErase() { - erase(); + std::swap(rows_->back(), *iter_); + rows_->pop_back(); } void SequentialIter::eraseRange(size_t first, size_t last) { diff --git a/src/context/Iterator.h b/src/context/Iterator.h index 197796356..dba3d912a 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -371,48 +371,6 @@ class GetNeighborsIter final : public Iterator { class SequentialIter final : public Iterator { public: - class SeqLogicalRow final : public LogicalRow { - public: - explicit SeqLogicalRow(const Row* row) : LogicalRow({row}) {} - - SeqLogicalRow(const SeqLogicalRow &r) = default; - SeqLogicalRow& operator=(const SeqLogicalRow &r) = default; - - SeqLogicalRow(SeqLogicalRow &&r) noexcept { - *this = std::move(r); - } - SeqLogicalRow& operator=(SeqLogicalRow &&r) noexcept { - segments_ = std::move(r.segments_); - return *this; - } - - const Value& operator[](size_t idx) const override { - DCHECK_EQ(segments_.size(), 1); - auto* row = segments_[0]; - if (idx < row->size()) { - return row->values[idx]; - } - return Value::kEmpty; - } - - size_t size() const override { - DCHECK_EQ(segments_.size(), 1); - auto* row = segments_[0]; - return row->size(); - } - - LogicalRow::Kind kind() const override { - return Kind::kSequential; - } - - const std::vector& segments() const override { - return segments_; - } - - private: - friend class SequentialIter; - }; - explicit SequentialIter(std::shared_ptr value); // Union multiple sequential iterators From 4835d244eb2844c1f19a215a1332880027a6b28c Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Tue, 9 Mar 2021 11:51:38 +0800 Subject: [PATCH 09/11] Rebase and fix join. --- src/context/test/IteratorTest.cpp | 25 ----- src/executor/query/DataJoinExecutor.cpp | 121 ----------------------- src/executor/query/DataJoinExecutor.h | 37 ------- src/executor/query/InnerJoinExecutor.cpp | 61 ++++++------ src/executor/query/InnerJoinExecutor.h | 4 +- src/executor/query/JoinExecutor.cpp | 21 ++-- src/executor/query/JoinExecutor.h | 5 +- src/executor/query/LeftJoinExecutor.cpp | 62 +++++------- src/executor/query/LeftJoinExecutor.h | 3 +- src/executor/test/JoinTest.cpp | 8 +- 10 files changed, 76 insertions(+), 271 deletions(-) delete mode 100644 src/executor/query/DataJoinExecutor.cpp delete mode 100644 src/executor/query/DataJoinExecutor.h diff --git a/src/context/test/IteratorTest.cpp b/src/context/test/IteratorTest.cpp index f2011753a..a2ae67414 100644 --- a/src/context/test/IteratorTest.cpp +++ b/src/context/test/IteratorTest.cpp @@ -776,31 +776,6 @@ TEST(IteratorTest, EdgeProp) { } } -TEST(IteratorTest, RowEqualTo) { - DataSet ds; - ds.colNames = {"col1", "col2"}; - for (auto i = 0; i < 2; ++i) { - Row row; - row.values.emplace_back(i); - row.values.emplace_back(folly::to(i)); - ds.rows.emplace_back(std::move(row)); - } - - Row row; - row.values.emplace_back(0); - row.values.emplace_back(folly::to(0)); - ds.rows.emplace_back(std::move(row)); - - SequentialIter::SeqLogicalRow row0(&ds.rows[0]); - SequentialIter::SeqLogicalRow row1(&ds.rows[1]); - - EXPECT_FALSE(std::equal_to()(&row0, &row1)); - - SequentialIter::SeqLogicalRow row2(&ds.rows[2]); - EXPECT_TRUE(std::equal_to()(&row0, &row2)); - EXPECT_TRUE(std::equal_to()(&row0, &row0)); -} - TEST(IteratorTest, DISABLED_EraseBySwap) { DataSet ds; ds.colNames = {"col1", "col2"}; diff --git a/src/executor/query/DataJoinExecutor.cpp b/src/executor/query/DataJoinExecutor.cpp deleted file mode 100644 index 34864fd3a..000000000 --- a/src/executor/query/DataJoinExecutor.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#include "executor/query/DataJoinExecutor.h" - -#include "planner/Query.h" -#include "context/QueryExpressionContext.h" -#include "context/Iterator.h" -#include "util/ScopedTimer.h" - -namespace nebula { -namespace graph { -folly::Future DataJoinExecutor::execute() { - return doInnerJoin(); -} - -Status DataJoinExecutor::close() { - exchange_ = false; - hashTable_.clear(); - return Executor::close(); -} - -folly::Future DataJoinExecutor::doInnerJoin() { - SCOPED_TIMER(&execTime_); - - auto* dataJoin = asNode(node()); - auto lhsIter = ectx_ - ->getVersionedResult(dataJoin->leftVar().first, - dataJoin->leftVar().second) - .iter(); - DCHECK(!!lhsIter); - if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) { - std::stringstream ss; - ss << "Join executor does not support " << lhsIter->kind(); - return error(Status::Error(ss.str())); - } - auto rhsIter = ectx_ - ->getVersionedResult(dataJoin->rightVar().first, - dataJoin->rightVar().second) - .iter(); - DCHECK(!!rhsIter); - if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) { - std::stringstream ss; - ss << "Join executor does not support " << lhsIter->kind(); - return error(Status::Error(ss.str())); - } - - - DataSet result; - if (!(lhsIter->empty() || rhsIter->empty())) { - if (lhsIter->size() < rhsIter->size()) { - buildHashTable(dataJoin->hashKeys(), lhsIter.get()); - result = probe(dataJoin->probeKeys(), rhsIter.get()); - } else { - exchange_ = true; - buildHashTable(dataJoin->probeKeys(), rhsIter.get()); - result = probe(dataJoin->hashKeys(), lhsIter.get()); - } - } - result.colNames = dataJoin->colNames(); - VLOG(1) << result; - return finish(ResultBuilder().value(Value(std::move(result))).finish()); -} - -void DataJoinExecutor::buildHashTable(const std::vector& hashKeys, - Iterator* iter) { - QueryExpressionContext ctx(ectx_); - for (; iter->valid(); iter->next()) { - List list; - list.values.reserve(hashKeys.size()); - for (auto& col : hashKeys) { - Value val = col->eval(ctx(iter)); - list.values.emplace_back(std::move(val)); - } - - auto& vals = hashTable_[list]; - vals.emplace_back(iter->row()); - } -} - -DataSet DataJoinExecutor::probe(const std::vector& probeKeys, - Iterator* probeIter) { - DataSet ds; - QueryExpressionContext ctx(ectx_); - for (; probeIter->valid(); probeIter->next()) { - List list; - list.values.reserve(probeKeys.size()); - for (auto& col : probeKeys) { - Value val = col->eval(ctx(probeIter)); - list.values.emplace_back(std::move(val)); - } - - const auto& range = hashTable_.find(list); - if (range == hashTable_.end()) { - continue; - } - for (auto* row : range->second) { - auto& lRow = *row; - auto& rRow = *probeIter->row(); - VLOG(1) << lRow << rRow; - Row newRow; - newRow.reserve(lRow.size() + rRow.size()); - auto& values = newRow.values; - if (exchange_) { - values.insert(values.end(), rRow.values.begin(), rRow.values.end()); - values.insert(values.end(), lRow.values.begin(), lRow.values.end()); - } else { - values.insert(values.end(), lRow.values.begin(), lRow.values.end()); - values.insert(values.end(), rRow.values.begin(), rRow.values.end()); - } - VLOG(1) << "Row: " << newRow; - ds.rows.emplace_back(std::move(newRow)); - } - } - return ds; -} -} // namespace graph -} // namespace nebula diff --git a/src/executor/query/DataJoinExecutor.h b/src/executor/query/DataJoinExecutor.h deleted file mode 100644 index fca3f80fc..000000000 --- a/src/executor/query/DataJoinExecutor.h +++ /dev/null @@ -1,37 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef EXECUTOR_QUERY_DATAJOINEXECUTOR_H_ -#define EXECUTOR_QUERY_DATAJOINEXECUTOR_H_ - -#include "executor/Executor.h" - -namespace nebula { -namespace graph { - -class DataJoinExecutor final : public Executor { -public: - DataJoinExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("DataJoinExecutor", node, qctx) {} - - folly::Future execute() override; - - Status close() override; - -private: - folly::Future doInnerJoin(); - - void buildHashTable(const std::vector& hashKeys, Iterator* iter); - - DataSet probe(const std::vector& probeKeys, Iterator* probeiter); - -private: - bool exchange_{false}; - std::unordered_map> hashTable_; -}; -} // namespace graph -} // namespace nebula -#endif diff --git a/src/executor/query/InnerJoinExecutor.cpp b/src/executor/query/InnerJoinExecutor.cpp index 429afce32..4934cc2bf 100644 --- a/src/executor/query/InnerJoinExecutor.cpp +++ b/src/executor/query/InnerJoinExecutor.cpp @@ -27,32 +27,27 @@ Status InnerJoinExecutor::close() { folly::Future InnerJoinExecutor::join() { auto* join = asNode(node()); - auto lhsIter = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter(); - auto rhsIter = - ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second).iter(); - - auto resultIter = std::make_unique(join->colNames()); - resultIter->joinIndex(lhsIter.get(), rhsIter.get()); - auto bucketSize = lhsIter->size() > rhsIter->size() ? rhsIter->size() : lhsIter->size(); + auto bucketSize = lhsIter_->size() > rhsIter_->size() ? rhsIter_->size() : lhsIter_->size(); hashTable_.reserve(bucketSize); - resultIter->reserve(lhsIter->size() > rhsIter->size() ? lhsIter->size() : rhsIter->size()); - - if (!(lhsIter->empty() || rhsIter->empty())) { - if (lhsIter->size() < rhsIter->size()) { - buildHashTable(join->hashKeys(), lhsIter.get()); - probe(join->probeKeys(), rhsIter.get(), resultIter.get()); + DataSet result; + if (!(lhsIter_->empty() || rhsIter_->empty())) { + if (lhsIter_->size() < rhsIter_->size()) { + buildHashTable(join->hashKeys(), lhsIter_.get()); + result = probe(join->probeKeys(), rhsIter_.get()); } else { exchange_ = true; - buildHashTable(join->probeKeys(), rhsIter.get()); - probe(join->hashKeys(), lhsIter.get(), resultIter.get()); + buildHashTable(join->probeKeys(), rhsIter_.get()); + result = probe(join->hashKeys(), lhsIter_.get()); } } - return finish(ResultBuilder().iter(std::move(resultIter)).finish()); + result.colNames = join->colNames(); + VLOG(1) << result; + return finish(ResultBuilder().value(Value(std::move(result))).finish()); } -void InnerJoinExecutor::probe(const std::vector& probeKeys, - Iterator* probeIter, - JoinIter* resultIter) { +DataSet InnerJoinExecutor::probe(const std::vector& probeKeys, + Iterator* probeIter) { + DataSet ds; QueryExpressionContext ctx(ectx_); for (; probeIter->valid(); probeIter->next()) { List list; @@ -62,29 +57,29 @@ void InnerJoinExecutor::probe(const std::vector& probeKeys, list.values.emplace_back(std::move(val)); } - auto range = hashTable_.find(list); + const auto& range = hashTable_.find(list); if (range == hashTable_.end()) { continue; } for (auto* row : range->second) { - std::vector values; - auto& lSegs = row->segments(); - auto& rSegs = probeIter->row()->segments(); - values.reserve(lSegs.size() + rSegs.size()); + auto& lRow = *row; + auto& rRow = *probeIter->row(); + VLOG(1) << lRow << rRow; + Row newRow; + newRow.reserve(lRow.size() + rRow.size()); + auto& values = newRow.values; if (exchange_) { - values.insert(values.end(), rSegs.begin(), rSegs.end()); - values.insert(values.end(), lSegs.begin(), lSegs.end()); + values.insert(values.end(), rRow.values.begin(), rRow.values.end()); + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); } else { - values.insert(values.end(), lSegs.begin(), lSegs.end()); - values.insert(values.end(), rSegs.begin(), rSegs.end()); + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); + values.insert(values.end(), rRow.values.begin(), rRow.values.end()); } - size_t size = row->size() + probeIter->row()->size(); - JoinIter::JoinLogicalRow newRow( - std::move(values), size, &resultIter->getColIdxIndices()); - VLOG(1) << node()->outputVar() << " : " << newRow; - resultIter->addRow(std::move(newRow)); + VLOG(1) << "Row: " << newRow; + ds.rows.emplace_back(std::move(newRow)); } } + return ds; } } // namespace graph } // namespace nebula diff --git a/src/executor/query/InnerJoinExecutor.h b/src/executor/query/InnerJoinExecutor.h index 8651ca34a..0ba8b4bc2 100644 --- a/src/executor/query/InnerJoinExecutor.h +++ b/src/executor/query/InnerJoinExecutor.h @@ -23,9 +23,7 @@ class InnerJoinExecutor final : public JoinExecutor { private: folly::Future join(); - void probe(const std::vector& probeKeys, - Iterator* probeiter, - JoinIter* resultIter); + DataSet probe(const std::vector& probeKeys, Iterator* probeiter); private: bool exchange_{false}; diff --git a/src/executor/query/JoinExecutor.cpp b/src/executor/query/JoinExecutor.cpp index 4395f4a94..1a700a7a0 100644 --- a/src/executor/query/JoinExecutor.cpp +++ b/src/executor/query/JoinExecutor.cpp @@ -15,23 +15,24 @@ namespace graph { Status JoinExecutor::checkInputDataSets() { auto* join = asNode(node()); - auto lhsIter = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter(); - DCHECK(!!lhsIter); - VLOG(1) << "lhs: " << join->leftVar().first << " " << lhsIter->size(); - if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) { + lhsIter_ = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter(); + DCHECK(!!lhsIter_); + VLOG(1) << "lhs: " << join->leftVar().first << " " << lhsIter_->size(); + if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) { std::stringstream ss; - ss << "Join executor does not support " << lhsIter->kind(); + ss << "Join executor does not support " << lhsIter_->kind(); return Status::Error(ss.str()); } - auto rhsIter = + rhsIter_ = ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second).iter(); - DCHECK(!!rhsIter); - VLOG(1) << "rhs: " << join->rightVar().first << " " << rhsIter->size(); - if (rhsIter->isGetNeighborsIter() || rhsIter->isDefaultIter()) { + DCHECK(!!rhsIter_); + VLOG(1) << "rhs: " << join->rightVar().first << " " << rhsIter_->size(); + if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) { std::stringstream ss; - ss << "Join executor does not support " << rhsIter->kind(); + ss << "Join executor does not support " << rhsIter_->kind(); return Status::Error(ss.str()); } + colSize_ = join->colNames().size(); return Status::OK(); } diff --git a/src/executor/query/JoinExecutor.h b/src/executor/query/JoinExecutor.h index 33e0eb9fd..e53082331 100644 --- a/src/executor/query/JoinExecutor.h +++ b/src/executor/query/JoinExecutor.h @@ -22,7 +22,10 @@ class JoinExecutor : public Executor { void buildHashTable(const std::vector& hashKeys, Iterator* iter); protected: - std::unordered_map> hashTable_; + std::unique_ptr lhsIter_; + std::unique_ptr rhsIter_; + size_t colSize_{0}; + std::unordered_map> hashTable_; }; } // namespace graph } // namespace nebula diff --git a/src/executor/query/LeftJoinExecutor.cpp b/src/executor/query/LeftJoinExecutor.cpp index ed521a934..a21f5a1cf 100644 --- a/src/executor/query/LeftJoinExecutor.cpp +++ b/src/executor/query/LeftJoinExecutor.cpp @@ -26,25 +26,22 @@ Status LeftJoinExecutor::close() { folly::Future LeftJoinExecutor::join() { auto* join = asNode(node()); - auto lhsIter = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter(); auto& rhsResult = ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second); rightColSize_ = rhsResult.valuePtr()->getDataSet().colNames.size(); - auto rhsIter = rhsResult.iter(); - auto resultIter = std::make_unique(join->colNames()); - resultIter->joinIndex(lhsIter.get(), rhsIter.get()); - hashTable_.reserve(rhsIter->size() == 0 ? 1 : rhsIter->size()); - resultIter->reserve(lhsIter->size()); - if (!lhsIter->empty()) { - buildHashTable(join->probeKeys(), rhsIter.get()); - probe(join->hashKeys(), lhsIter.get(), resultIter.get()); + hashTable_.reserve(rhsIter_->size() == 0 ? 1 : rhsIter_->size()); + DataSet result; + if (!lhsIter_->empty()) { + buildHashTable(join->probeKeys(), rhsIter_.get()); + result = probe(join->hashKeys(), lhsIter_.get()); } - return finish(ResultBuilder().iter(std::move(resultIter)).finish()); + result.colNames = join->colNames(); + return finish(ResultBuilder().value(Value(std::move(result))).finish()); } -void LeftJoinExecutor::probe(const std::vector& probeKeys, - Iterator* probeIter, - JoinIter* resultIter) { +DataSet LeftJoinExecutor::probe(const std::vector& probeKeys, + Iterator* probeIter) { + DataSet ds; QueryExpressionContext ctx(ectx_); for (; probeIter->valid(); probeIter->next()) { List list; @@ -56,33 +53,28 @@ void LeftJoinExecutor::probe(const std::vector& probeKeys, auto range = hashTable_.find(list); if (range == hashTable_.end()) { - std::vector values; - auto& lSegs = probeIter->row()->segments(); - values.reserve(lSegs.size() + 1); - values.insert(values.end(), lSegs.begin(), lSegs.end()); - Row* emptyRow = qctx_->objPool()->add(new List(std::vector(rightColSize_))); - values.insert(values.end(), emptyRow); - size_t size = probeIter->row()->size() + rightColSize_; - JoinIter::JoinLogicalRow newRow( - std::move(values), size, &resultIter->getColIdxIndices()); - VLOG(1) << node()->outputVar() << " : " << newRow; - resultIter->addRow(std::move(newRow)); + auto& lRow = *probeIter->row(); + auto lRowSize = lRow.size(); + Row newRow; + newRow.reserve(colSize_); + auto& values = newRow.values; + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); + values.insert(values.end(), colSize_ - lRowSize, Value::kNullValue); + ds.rows.emplace_back(std::move(newRow)); } else { for (auto* row : range->second) { - std::vector values; - auto& lSegs = probeIter->row()->segments(); - auto& rSegs = row->segments(); - values.reserve(lSegs.size() + rSegs.size()); - values.insert(values.end(), lSegs.begin(), lSegs.end()); - values.insert(values.end(), rSegs.begin(), rSegs.end()); - size_t size = row->size() + probeIter->row()->size(); - JoinIter::JoinLogicalRow newRow( - std::move(values), size, &resultIter->getColIdxIndices()); - VLOG(1) << node()->outputVar() << " : " << newRow; - resultIter->addRow(std::move(newRow)); + auto& lRow = *probeIter->row(); + auto& rRow = *row; + Row newRow; + auto& values = newRow.values; + values.reserve(lRow.size() + rRow.size()); + values.insert(values.end(), lRow.values.begin(), lRow.values.end()); + values.insert(values.end(), rRow.values.begin(), rRow.values.end()); + ds.rows.emplace_back(std::move(newRow)); } } } + return ds; } } // namespace graph } // namespace nebula diff --git a/src/executor/query/LeftJoinExecutor.h b/src/executor/query/LeftJoinExecutor.h index c4245e348..04d2d13de 100644 --- a/src/executor/query/LeftJoinExecutor.h +++ b/src/executor/query/LeftJoinExecutor.h @@ -24,8 +24,7 @@ class LeftJoinExecutor final : public JoinExecutor { private: folly::Future join(); - void probe(const std::vector& probeKeys, Iterator* probeiter, - JoinIter* resultIter); + DataSet probe(const std::vector& probeKeys, Iterator* probeIter); private: size_t rightColSize_{0}; diff --git a/src/executor/test/JoinTest.cpp b/src/executor/test/JoinTest.cpp index af8743533..a70f60922 100644 --- a/src/executor/test/JoinTest.cpp +++ b/src/executor/test/JoinTest.cpp @@ -383,7 +383,7 @@ TEST_F(JoinTest, LeftJoinTwice) { if (i < 2) { row.values.emplace_back(folly::to(11)); } else { - row.values.emplace_back(Value::kEmpty); + row.values.emplace_back(Value::kNullValue); } expected.rows.emplace_back(std::move(row)); } @@ -401,8 +401,8 @@ TEST_F(JoinTest, LeftJoinEmpty) { row.values.emplace_back(i); row.values.emplace_back(i + 1); row.values.emplace_back(folly::to(i / 2 + 5 + i % 2)); - row.values.emplace_back(Value::kEmpty); - row.values.emplace_back(Value::kEmpty); + row.values.emplace_back(Value::kNullValue); + row.values.emplace_back(Value::kNullValue); expected.rows.emplace_back(std::move(row)); } testLeftJoin("var1", "empty_var2", expected, __LINE__); @@ -555,7 +555,7 @@ TEST_F(JoinTest, InnerJoinAndLeftjoin) { if (i < 2) { row.values.emplace_back(folly::to(11)); } else { - row.values.emplace_back(Value::kEmpty); + row.values.emplace_back(Value::kNullValue); } expected.rows.emplace_back(std::move(row)); } From 3ca5770ebfb19aa5dbaeba7e3e0e5caa87499b43 Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Tue, 9 Mar 2021 12:03:43 +0800 Subject: [PATCH 10/11] Enable erase by swap test. --- src/context/test/IteratorTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context/test/IteratorTest.cpp b/src/context/test/IteratorTest.cpp index a2ae67414..786a7b9a1 100644 --- a/src/context/test/IteratorTest.cpp +++ b/src/context/test/IteratorTest.cpp @@ -776,7 +776,7 @@ TEST(IteratorTest, EdgeProp) { } } -TEST(IteratorTest, DISABLED_EraseBySwap) { +TEST(IteratorTest, EraseBySwap) { DataSet ds; ds.colNames = {"col1", "col2"}; for (auto i = 0; i < 3; ++i) { From a3aee7942e4929247f15c167a6be368f41366a0c Mon Sep 17 00:00:00 2001 From: William Chen <13495049+CPWstatic@users.noreply.github.com> Date: Tue, 9 Mar 2021 13:15:55 +0800 Subject: [PATCH 11/11] Remove unneccesary code in sort. --- src/executor/query/SortExecutor.cpp | 26 +++++++------------------- src/executor/query/TopNExecutor.cpp | 22 ++++++---------------- 2 files changed, 13 insertions(+), 35 deletions(-) diff --git a/src/executor/query/SortExecutor.cpp b/src/executor/query/SortExecutor.cpp index f7e88c472..3ce4900ba 100644 --- a/src/executor/query/SortExecutor.cpp +++ b/src/executor/query/SortExecutor.cpp @@ -19,15 +19,11 @@ folly::Future SortExecutor::execute() { if (UNLIKELY(iter == nullptr)) { return Status::Error("Internal error: nullptr iterator in sort executor"); } - if (UNLIKELY(iter->isDefaultIter())) { - std::string errMsg = "Internal error: Sort executor does not supported DefaultIter"; - LOG(ERROR) << errMsg; - return Status::Error(errMsg); - } - if (UNLIKELY(iter->isGetNeighborsIter())) { - std::string errMsg = "Internal error: Sort executor does not supported GetNeighborsIter"; - LOG(ERROR) << errMsg; - return Status::Error(errMsg); + if (UNLIKELY(!iter->isSequentialIter())) { + std::stringstream ss; + ss << "Internal error: Sort executor does not supported " << iter->kind(); + LOG(ERROR) << ss.str(); + return Status::Error(ss.str()); } auto &factors = sort->factors(); @@ -48,16 +44,8 @@ folly::Future SortExecutor::execute() { return false; }; - if (iter->isSequentialIter()) { - auto seqIter = static_cast(iter.get()); - std::sort(seqIter->begin(), seqIter->end(), comparator); - } else if (iter->isJoinIter()) { - // auto joinIter = static_cast(iter.get()); - // std::sort(joinIter->begin(), joinIter->end(), comparator); - } else if (iter->isPropIter()) { - // auto propIter = static_cast(iter.get()); - // std::sort(propIter->begin(), propIter->end(), comparator); - } + auto seqIter = static_cast(iter.get()); + std::sort(seqIter->begin(), seqIter->end(), comparator); return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); } diff --git a/src/executor/query/TopNExecutor.cpp b/src/executor/query/TopNExecutor.cpp index 2d2dbcd8c..1b545102e 100644 --- a/src/executor/query/TopNExecutor.cpp +++ b/src/executor/query/TopNExecutor.cpp @@ -18,15 +18,11 @@ folly::Future TopNExecutor::execute() { if (UNLIKELY(iter == nullptr)) { return Status::Error("Internal error: nullptr iterator in topn executor"); } - if (UNLIKELY(iter->isDefaultIter())) { - std::string errMsg = "Internal error: Sort executor does not supported DefaultIter"; - LOG(ERROR) << errMsg; - return Status::Error(errMsg); - } - if (UNLIKELY(iter->isGetNeighborsIter())) { - std::string errMsg = "Internal error: TopN executor does not supported GetNeighborsIter"; - LOG(ERROR) << errMsg; - return Status::Error(errMsg); + if (UNLIKELY(!iter->isSequentialIter())) { + std::stringstream ss; + ss << "Internal error: Sort executor does not supported " << iter->kind(); + LOG(ERROR) << ss.str(); + return Status::Error(ss.str()); } auto &factors = topn->factors(); @@ -65,13 +61,7 @@ folly::Future TopNExecutor::execute() { return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); } - if (iter->isSequentialIter()) { - executeTopN(iter.get()); - } else if (iter->isJoinIter()) { - // executeTopN(iter.get()); - } else if (iter->isPropIter()) { - // executeTopN(iter.get()); - } + executeTopN(iter.get()); iter->eraseRange(maxCount_, size); return finish(ResultBuilder().value(iter->valuePtr()).iter(std::move(iter)).finish()); }