Skip to content

Commit

Permalink
skipped duplicated self-reflective edges in GetNeighbors.
Browse files Browse the repository at this point in the history
  • Loading branch information
xtcyclist committed Nov 29, 2022
1 parent b81aadb commit 7059e95
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
35 changes: 34 additions & 1 deletion src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ class HashJoinNode : public IterateNode<VertexID> {

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
iters.emplace_back(edgeNode->iter());
if (!isDuplicatedSelfReflectiveEdge(edgeNode)) {
iters.emplace_back(edgeNode->iter());
}
}
iter_.reset(new MultiEdgeIterator(std::move(iters)));
if (iter_->valid()) {
Expand Down Expand Up @@ -151,6 +153,37 @@ class HashJoinNode : public IterateNode<VertexID> {
}
}

/* This section is implemented to filter out duplicated self-reflective edges. */
private:
bool isDuplicatedSelfReflectiveEdge(SingleEdgeNode* edgeNode) {
// get destination id
if (!edgeNode->valid()) {
return false;
}
auto rawKey = edgeNode->key();
std::string srcID = NebulaKeyUtils::getSrcId(context_->vIdLen(), rawKey).str();
std::string dstID = NebulaKeyUtils::getDstId(context_->vIdLen(), rawKey).str();
VLOG(1) << "Tried to detect SR Edge with: " << srcID << "\t" << dstID;
if (srcID.compare(dstID) == 0) {
// self-reflective edge
std::string rank = std::to_string(NebulaKeyUtils::getRank(context_->vIdLen(), rawKey));
auto edgeType = NebulaKeyUtils::getEdgeType(context_->vIdLen(), rawKey);
edgeType = edgeType > 0 ? edgeType : -edgeType;
std::string type = std::to_string(edgeType);
// std::string key = srcID + std::to_string(edgeType) + std::to_string(rank);
std::string key;
key.reserve(srcID.size() + type.size() + rank.size());
key = type + rank + srcID;
if (!visitedSelfReflectiveEdges_.insert(key).second) {
return true;
}
for (auto& elem : visitedSelfReflectiveEdges_) {
}
}
return false;
}
std::unordered_set<std::string> visitedSelfReflectiveEdges_;

private:
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
Expand Down
24 changes: 0 additions & 24 deletions src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,30 +182,6 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> GetNeighborsProce
});
}

bool GetNeighborsProcessor::isDuplicatedSelfReflectiveEdge(SingleEdgeNode* edgeNode) {
// get destination id
auto rawKey = edgeNode->key();
auto srcID = NebulaKeyUtils::getSrcId(spaceVidLen_, rawKey).str();
auto dstID = NebulaKeyUtils::getDstId(spaceVidLen_, rawKey).str();
if (srcID.compare(0, srcID.size(), dstID.c_str()) == 0) {
// self-reflective edge
auto rank = NebulaKeyUtils::getRank(spaceVidLen_, rawKey);
auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, rawKey);
edgeType = edgeType > 0 ? edgeType : -edgeType;
std::string key = srcID + std::to_string(edgeType) + std::to_string(rank);
if (!visitedSelfReflectiveEdges_.insert(key).second) {
VLOG(1) << "Skipped SR Edge with vid: " << srcID;
return true;
}
for (auto& elem : visitedSelfReflectiveEdges_) {
VLOG(1) << "element:" << elem;
}
VLOG(1) << "Added SR Edge with vid: " << srcID;
}
VLOG(1) << "Tried to detect SR Edge with: " << srcID << "\t" << dstID;
return false;
}

StoragePlan<VertexID> GetNeighborsProcessor::buildPlan(RuntimeContext* context,
StorageExpressionContext* expCtx,
nebula::DataSet* result,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/query/GetNeighborsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ class GetNeighborsProcessor
bool random);

private:
bool isDuplicatedSelfReflectiveEdge(SingleEdgeNode* edgeNode);
std::vector<RuntimeContext> contexts_;
std::vector<StorageExpressionContext> expCtxs_;
std::vector<nebula::DataSet> results_;
std::unordered_set<std::string> visitedSelfReflectiveEdges_;
};

} // namespace storage
Expand Down

0 comments on commit 7059e95

Please sign in to comment.