Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Rebase and fix join.
Browse files Browse the repository at this point in the history
  • Loading branch information
CPWstatic committed Mar 9, 2021
1 parent 4ee859d commit 4835d24
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 271 deletions.
25 changes: 0 additions & 25 deletions src/context/test/IteratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,31 +776,6 @@ TEST(IteratorTest, EdgeProp) {
}
}

TEST(IteratorTest, RowEqualTo) {
DataSet ds;
ds.colNames = {"col1", "col2"};
for (auto i = 0; i < 2; ++i) {
Row row;
row.values.emplace_back(i);
row.values.emplace_back(folly::to<std::string>(i));
ds.rows.emplace_back(std::move(row));
}

Row row;
row.values.emplace_back(0);
row.values.emplace_back(folly::to<std::string>(0));
ds.rows.emplace_back(std::move(row));

SequentialIter::SeqLogicalRow row0(&ds.rows[0]);
SequentialIter::SeqLogicalRow row1(&ds.rows[1]);

EXPECT_FALSE(std::equal_to<const nebula::graph::LogicalRow*>()(&row0, &row1));

SequentialIter::SeqLogicalRow row2(&ds.rows[2]);
EXPECT_TRUE(std::equal_to<const nebula::graph::LogicalRow*>()(&row0, &row2));
EXPECT_TRUE(std::equal_to<const nebula::graph::LogicalRow*>()(&row0, &row0));
}

TEST(IteratorTest, DISABLED_EraseBySwap) {
DataSet ds;
ds.colNames = {"col1", "col2"};
Expand Down
121 changes: 0 additions & 121 deletions src/executor/query/DataJoinExecutor.cpp

This file was deleted.

37 changes: 0 additions & 37 deletions src/executor/query/DataJoinExecutor.h

This file was deleted.

61 changes: 28 additions & 33 deletions src/executor/query/InnerJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,27 @@ Status InnerJoinExecutor::close() {

folly::Future<Status> InnerJoinExecutor::join() {
auto* join = asNode<Join>(node());
auto lhsIter = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter();
auto rhsIter =
ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second).iter();

auto resultIter = std::make_unique<JoinIter>(join->colNames());
resultIter->joinIndex(lhsIter.get(), rhsIter.get());
auto bucketSize = lhsIter->size() > rhsIter->size() ? rhsIter->size() : lhsIter->size();
auto bucketSize = lhsIter_->size() > rhsIter_->size() ? rhsIter_->size() : lhsIter_->size();
hashTable_.reserve(bucketSize);
resultIter->reserve(lhsIter->size() > rhsIter->size() ? lhsIter->size() : rhsIter->size());

if (!(lhsIter->empty() || rhsIter->empty())) {
if (lhsIter->size() < rhsIter->size()) {
buildHashTable(join->hashKeys(), lhsIter.get());
probe(join->probeKeys(), rhsIter.get(), resultIter.get());
DataSet result;
if (!(lhsIter_->empty() || rhsIter_->empty())) {
if (lhsIter_->size() < rhsIter_->size()) {
buildHashTable(join->hashKeys(), lhsIter_.get());
result = probe(join->probeKeys(), rhsIter_.get());
} else {
exchange_ = true;
buildHashTable(join->probeKeys(), rhsIter.get());
probe(join->hashKeys(), lhsIter.get(), resultIter.get());
buildHashTable(join->probeKeys(), rhsIter_.get());
result = probe(join->hashKeys(), lhsIter_.get());
}
}
return finish(ResultBuilder().iter(std::move(resultIter)).finish());
result.colNames = join->colNames();
VLOG(1) << result;
return finish(ResultBuilder().value(Value(std::move(result))).finish());
}

void InnerJoinExecutor::probe(const std::vector<Expression*>& probeKeys,
Iterator* probeIter,
JoinIter* resultIter) {
DataSet InnerJoinExecutor::probe(const std::vector<Expression*>& probeKeys,
Iterator* probeIter) {
DataSet ds;
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
List list;
Expand All @@ -62,29 +57,29 @@ void InnerJoinExecutor::probe(const std::vector<Expression*>& probeKeys,
list.values.emplace_back(std::move(val));
}

auto range = hashTable_.find(list);
const auto& range = hashTable_.find(list);
if (range == hashTable_.end()) {
continue;
}
for (auto* row : range->second) {
std::vector<const Row*> values;
auto& lSegs = row->segments();
auto& rSegs = probeIter->row()->segments();
values.reserve(lSegs.size() + rSegs.size());
auto& lRow = *row;
auto& rRow = *probeIter->row();
VLOG(1) << lRow << rRow;
Row newRow;
newRow.reserve(lRow.size() + rRow.size());
auto& values = newRow.values;
if (exchange_) {
values.insert(values.end(), rSegs.begin(), rSegs.end());
values.insert(values.end(), lSegs.begin(), lSegs.end());
values.insert(values.end(), rRow.values.begin(), rRow.values.end());
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
} else {
values.insert(values.end(), lSegs.begin(), lSegs.end());
values.insert(values.end(), rSegs.begin(), rSegs.end());
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
values.insert(values.end(), rRow.values.begin(), rRow.values.end());
}
size_t size = row->size() + probeIter->row()->size();
JoinIter::JoinLogicalRow newRow(
std::move(values), size, &resultIter->getColIdxIndices());
VLOG(1) << node()->outputVar() << " : " << newRow;
resultIter->addRow(std::move(newRow));
VLOG(1) << "Row: " << newRow;
ds.rows.emplace_back(std::move(newRow));
}
}
return ds;
}
} // namespace graph
} // namespace nebula
4 changes: 1 addition & 3 deletions src/executor/query/InnerJoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ class InnerJoinExecutor final : public JoinExecutor {

private:
folly::Future<Status> join();
void probe(const std::vector<Expression*>& probeKeys,
Iterator* probeiter,
JoinIter* resultIter);
DataSet probe(const std::vector<Expression*>& probeKeys, Iterator* probeiter);

private:
bool exchange_{false};
Expand Down
21 changes: 11 additions & 10 deletions src/executor/query/JoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@ namespace graph {

Status JoinExecutor::checkInputDataSets() {
auto* join = asNode<Join>(node());
auto lhsIter = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter();
DCHECK(!!lhsIter);
VLOG(1) << "lhs: " << join->leftVar().first << " " << lhsIter->size();
if (lhsIter->isGetNeighborsIter() || lhsIter->isDefaultIter()) {
lhsIter_ = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter();
DCHECK(!!lhsIter_);
VLOG(1) << "lhs: " << join->leftVar().first << " " << lhsIter_->size();
if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "Join executor does not support " << lhsIter->kind();
ss << "Join executor does not support " << lhsIter_->kind();
return Status::Error(ss.str());
}
auto rhsIter =
rhsIter_ =
ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second).iter();
DCHECK(!!rhsIter);
VLOG(1) << "rhs: " << join->rightVar().first << " " << rhsIter->size();
if (rhsIter->isGetNeighborsIter() || rhsIter->isDefaultIter()) {
DCHECK(!!rhsIter_);
VLOG(1) << "rhs: " << join->rightVar().first << " " << rhsIter_->size();
if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "Join executor does not support " << rhsIter->kind();
ss << "Join executor does not support " << rhsIter_->kind();
return Status::Error(ss.str());
}
colSize_ = join->colNames().size();
return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion src/executor/query/JoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class JoinExecutor : public Executor {
void buildHashTable(const std::vector<Expression*>& hashKeys, Iterator* iter);

protected:
std::unordered_map<List, std::vector<const LogicalRow*>> hashTable_;
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;
size_t colSize_{0};
std::unordered_map<List, std::vector<const Row*>> hashTable_;
};
} // namespace graph
} // namespace nebula
Expand Down
Loading

0 comments on commit 4835d24

Please sign in to comment.