Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into feature/fetch-vertices-on-multi-tags
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu authored Oct 23, 2020
2 parents a622b32 + 5203f5d commit 490f6b8
Show file tree
Hide file tree
Showing 32 changed files with 1,273 additions and 121 deletions.
1 change: 1 addition & 0 deletions src/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ nebula_add_library(
query/IndexScanExecutor.cpp
algo/ConjunctPathExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/ProduceSemiShortestPathExecutor.cpp
admin/SwitchSpaceExecutor.cpp
admin/CreateUserExecutor.cpp
admin/DropUserExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "executor/admin/SwitchSpaceExecutor.h"
#include "executor/admin/UpdateUserExecutor.h"
#include "executor/algo/BFSShortestPathExecutor.h"
#include "executor/algo/ProduceSemiShortestPathExecutor.h"
#include "executor/algo/ConjunctPathExecutor.h"
#include "executor/logic/LoopExecutor.h"
#include "executor/logic/PassThroughExecutor.h"
Expand Down Expand Up @@ -370,6 +371,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kBFSShortest: {
return pool->add(new BFSShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kProduceSemiShortestPath: {
return pool->add(new ProduceSemiShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kConjunctPath: {
return pool->add(new ConjunctPathExecutor(node, qctx));
}
Expand Down
261 changes: 261 additions & 0 deletions src/executor/algo/ProduceSemiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "executor/algo/ProduceSemiShortestPathExecutor.h"

#include "planner/Algo.h"

namespace nebula {
namespace graph {

std::vector<Path> ProduceSemiShortestPathExecutor::createPaths(
const std::vector<const Path*>& paths,
const Edge& edge) {
std::vector<Path> newPaths;
newPaths.reserve(paths.size());
for (auto p : paths) {
Path path = *p;
path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {}));
newPaths.emplace_back(std::move(path));
}
return newPaths;
}

void ProduceSemiShortestPathExecutor::dstInCurrent(const Edge& edge,
CostPathMapType& currentCostPathMap) {
auto& src = edge.src;
auto& dst = edge.dst;
auto weight = 1; // weight = weight_->getWeight();
auto& srcPaths = historyCostPathMap_[src];

for (auto& srcPath : srcPaths) {
if (currentCostPathMap[dst].find(srcPath.first) != currentCostPathMap[dst].end()) {
// startVid in currentCostPathMap[dst]
auto newCost = srcPath.second.cost_ + weight;
auto oldCost = currentCostPathMap[dst][srcPath.first].cost_;
if (newCost > oldCost) {
continue;
} else if (newCost < oldCost) {
// update (dst, startVid)'s path
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
currentCostPathMap[dst][srcPath.first].cost_ = newCost;
currentCostPathMap[dst][srcPath.first].paths_.swap(newPaths);
} else {
// add (dst, startVid)'s path
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
for (auto& p : newPaths) {
currentCostPathMap[dst][srcPath.first].paths_.emplace_back(std::move(p));
}
}
} else {
// startVid not in currentCostPathMap[dst]
auto newCost = srcPath.second.cost_ + weight;
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
// dst in history
if (historyCostPathMap_.find(dst) != historyCostPathMap_.end()) {
if (historyCostPathMap_[dst].find(srcPath.first) !=
historyCostPathMap_[dst].end()) {
removeSamePath(newPaths, historyCostPathMap_[dst][srcPath.first].paths_);
}
}
if (!newPaths.empty()) {
CostPaths temp(newCost, newPaths);
currentCostPathMap[dst].emplace(srcPath.first, std::move(temp));
}
}
}
}

void ProduceSemiShortestPathExecutor::dstNotInHistory(const Edge& edge,
CostPathMapType& currentCostPathMap) {
auto& src = edge.src;
auto& dst = edge.dst;
auto weight = 1; // weight = weight_->getWeight();
auto& srcPaths = historyCostPathMap_[src];
if (currentCostPathMap.find(dst) == currentCostPathMap.end()) {
// dst not in history and not in current
for (auto& srcPath : srcPaths) {
auto cost = srcPath.second.cost_ + weight;

std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
std::unordered_map<Value, CostPaths> temp = {
{srcPath.first, CostPaths(cost, newPaths)}};
currentCostPathMap.emplace(dst, std::move(temp));
}
} else {
// dst in current
dstInCurrent(edge, currentCostPathMap);
}
}

void ProduceSemiShortestPathExecutor::removeSamePath(std::vector<Path>& paths,
std::vector<const Path*>& historyPathsPtr) {
for (auto ptr : historyPathsPtr) {
auto iter = paths.begin();
while (iter != paths.end()) {
if (*iter == *ptr) {
iter = paths.erase(iter);
} else {
++iter;
}
}
}
}

void ProduceSemiShortestPathExecutor::dstInHistory(const Edge& edge,
CostPathMapType& currentCostPathMap) {
auto& src = edge.src;
auto& dst = edge.dst;
auto weight = 1; // weight = weight_->getWeight();
auto& srcPaths = historyCostPathMap_[src];

if (currentCostPathMap.find(dst) == currentCostPathMap.end()) {
// dst not in current but in history
for (auto& srcPath : srcPaths) {
if (historyCostPathMap_[dst].find(srcPath.first) == historyCostPathMap_[dst].end()) {
// (dst, startVid)'s path not in history
auto newCost = srcPath.second.cost_ + weight;
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
std::unordered_map<Value, CostPaths> temp = {
{srcPath.first, CostPaths(newCost, newPaths)}};
currentCostPathMap.emplace(dst, std::move(temp));
} else {
// (dst, startVid)'s path in history, compare cost
auto newCost = srcPath.second.cost_ + weight;
auto historyCost = historyCostPathMap_[dst][srcPath.first].cost_;
if (newCost > historyCost) {
continue;
} else if (newCost < historyCost) {
// update (dst, startVid)'s path
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
std::unordered_map<Value, CostPaths> temp = {
{srcPath.first, CostPaths(newCost, newPaths)}};
currentCostPathMap.emplace(dst, std::move(temp));
} else {
std::vector<Path> newPaths = createPaths(srcPath.second.paths_, edge);
// if same path in history, remove it
removeSamePath(newPaths, historyCostPathMap_[dst][srcPath.first].paths_);
if (newPaths.empty()) {
continue;
}
std::unordered_map<Value, CostPaths> temp = {
{srcPath.first, CostPaths(newCost, newPaths)}};
currentCostPathMap.emplace(dst, std::move(temp));
}
}
}
} else {
// dst in current
dstInCurrent(edge, currentCostPathMap);
}
}

void ProduceSemiShortestPathExecutor::updateHistory(const Value& dst,
const Value& src,
double cost,
Value& paths) {
const List& pathList = paths.getList();
std::vector<const Path*> tempPathsPtr;
tempPathsPtr.reserve(pathList.size());
for (auto& p : pathList.values) {
tempPathsPtr.emplace_back(&p.getPath());
}

if (historyCostPathMap_.find(dst) == historyCostPathMap_.end()) {
// insert path to history
std::unordered_map<Value, CostPathsPtr> temp = {{src, CostPathsPtr(cost, tempPathsPtr)}};
historyCostPathMap_.emplace(dst, std::move(temp));
} else {
if (historyCostPathMap_[dst].find(src) == historyCostPathMap_[dst].end()) {
// startVid not in history ; insert it
historyCostPathMap_[dst].emplace(src, CostPathsPtr(cost, tempPathsPtr));
} else {
// startVid in history; compare cost
auto historyCost = historyCostPathMap_[dst][src].cost_;
if (cost < historyCost) {
historyCostPathMap_[dst][src].cost_ = cost;
historyCostPathMap_[dst][src].paths_.swap(tempPathsPtr);
} else if (cost == historyCost) {
for (auto p : tempPathsPtr) {
historyCostPathMap_[dst][src].paths_.emplace_back(p);
}
} else {
LOG(FATAL) << "current Cost : " << cost << " history Cost : " << historyCost;
}
}
}
}

folly::Future<Status> ProduceSemiShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* pssp = asNode<ProduceSemiShortestPath>(node());
auto iter = ectx_->getResult(pssp->inputVar()).iter();
VLOG(1) << "current: " << node()->outputVar();
VLOG(1) << "input: " << pssp->inputVar();
DCHECK(!!iter);

CostPathMapType currentCostPathMap;

for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (!edgeVal.isEdge()) {
continue;
}
auto& edge = edgeVal.getEdge();
auto& src = edge.src;
auto& dst = edge.dst;
auto weight = 1;

if (historyCostPathMap_.find(src) == historyCostPathMap_.end()) {
// src not in history, now src must be startVid
Path path;
// (todo) can't get dst's vertex
path.src = Vertex(src, {});
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
CostPaths costPaths(weight, {std::move(path)});
if (currentCostPathMap.find(dst) != currentCostPathMap.end()) {
currentCostPathMap[dst].emplace(src, std::move(costPaths));
} else {
std::unordered_map<Value, CostPaths> temp = {{src, std::move(costPaths)}};
currentCostPathMap.emplace(dst, std::move(temp));
}
} else {
if (historyCostPathMap_.find(dst) == historyCostPathMap_.end()) {
dstNotInHistory(edge, currentCostPathMap);
} else {
dstInHistory(edge, currentCostPathMap);
}
}
}

DataSet ds;
ds.colNames = node()->colNames();
for (auto& dstPath : currentCostPathMap) {
auto& dst = dstPath.first;
for (auto& srcPath : dstPath.second) {
auto cost = srcPath.second.cost_;
List paths;
paths.values.reserve(srcPath.second.paths_.size());
for (auto & path : srcPath.second.paths_) {
paths.values.emplace_back(std::move(path));
}
Row row;
row.values.emplace_back(std::move(dst));
row.values.emplace_back(std::move(cost));
row.values.emplace_back(std::move(paths));
ds.rows.emplace_back(std::move(row));

// update (dst, startVid)'s paths to history
updateHistory(dst, srcPath.first, cost, ds.rows.back().values.back());
}
}

return finish(ResultBuilder().value(Value(std::move(ds))).finish());
}


} // namespace graph
} // namespace nebula
67 changes: 67 additions & 0 deletions src/executor/algo/ProduceSemiShortestPathExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef EXECUTOR_QUERY_PRODUCESEMISHORTESTPATHEXECUTOR_H_
#define EXECUTOR_QUERY_PRODUCESEMISHORTESTPATHEXECUTOR_H_

#include "executor/Executor.h"

namespace nebula {
namespace graph {
class ProduceSemiShortestPathExecutor final : public Executor {
public:
ProduceSemiShortestPathExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("ProduceSemiShortestPath", node, qctx) {}

folly::Future<Status> execute() override;

struct CostPaths {
double cost_;
std::vector<Path> paths_;
CostPaths() = default;
CostPaths(double cost, std::vector<Path>& paths) : cost_(cost) {
paths_.swap(paths);
}
CostPaths(double cost, std::vector<Path>&& paths) : cost_(cost) {
paths_.swap(paths);
}
};

struct CostPathsPtr {
CostPathsPtr() = default;
CostPathsPtr(double cost, std::vector<const Path*>& paths) : cost_(cost) {
paths_.swap(paths);
}
double cost_;
std::vector<const Path*> paths_;
};

using CostPathMapType = std::unordered_map<Value, std::unordered_map<Value, CostPaths>>;
using CostPathMapPtr = std::unordered_map<Value, std::unordered_map<Value, CostPathsPtr>>;

private:
void dstNotInHistory(const Edge& edge, CostPathMapType&);

void dstInHistory(const Edge& edge, CostPathMapType&);

void dstInCurrent(const Edge& edge, CostPathMapType&);

void updateHistory(const Value& dst, const Value& src, double cost, Value& paths);

std::vector<Path> createPaths(const std::vector<const Path*>& paths, const Edge& edge);

void removeSamePath(std::vector<Path>& paths, std::vector<const Path*> &historyPaths);

private:
// dst : {src : <cost, {Path*}>}
CostPathMapPtr historyCostPathMap_;

// std::unique_ptr<IWeight> weight_;
};

} // namespace graph
} // namespace nebula
#endif // EXECUTOR_QUERY_PRODUCESEMISHORTESTPATHEXECUTOR_H_
1 change: 1 addition & 0 deletions src/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ nebula_add_test(
DataJoinTest.cpp
BFSShortestTest.cpp
ConjunctPathTest.cpp
ProduceSemiShortestPathTest.cpp
OBJECTS
${EXEC_QUERY_TEST_OBJS}
LIBRARIES
Expand Down
Loading

0 comments on commit 490f6b8

Please sign in to comment.