Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Sep 16, 2022
1 parent 49ba8c0 commit beed8aa
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 19 deletions.
14 changes: 14 additions & 0 deletions src/graph/context/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ const Result& ExecutionContext::getResult(const std::string& name) const {
}
}

void ExecutionContext::setVersionedResult(const std::string& name,
Result&& result,
int64_t version) {
auto it = valueMap_.find(name);
if (it != valueMap_.end()) {
auto& hist = it->second;
auto size = hist.size();
if (static_cast<size_t>(std::abs(version)) >= size) {
return;
}
hist[(size + version - 1) % size] = std::move(result);
}
}

const Result& ExecutionContext::getVersionedResult(const std::string& name, int64_t version) const {
auto& result = getHistory(name);
auto size = result.size();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class ExecutionContext {

const Result& getVersionedResult(const std::string& name, int64_t version) const;

void setVersionedResult(const std::string& name, Result&& result, int64_t version);

size_t numVersions(const std::string& name) const;

// Return all existing history of the value. The front is the latest value
Expand Down
6 changes: 4 additions & 2 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "graph/context/Iterator.h"

#include <robin_hood.h>

#include "common/datatypes/Edge.h"
#include "common/datatypes/Vertex.h"
#include "common/memory/MemoryUtils.h"
Expand Down Expand Up @@ -478,7 +480,6 @@ List GetNeighborsIter::getVertices() {
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vertices.values.emplace_back(getVertex());
VLOG(1) << "vertex: " << getVertex() << " size: " << vertices.size();
}
}
reset();
Expand Down Expand Up @@ -549,8 +550,9 @@ List GetNeighborsIter::getEdges() {
auto edge = getEdge();
if (edge.isEdge()) {
const_cast<Edge&>(edge.getEdge()).format();
DLOG(ERROR) << "dc edge: " << edge.toString();
edges.values.emplace_back(std::move(edge));
}
edges.values.emplace_back(std::move(edge));
}
reset();
return edges;
Expand Down
51 changes: 50 additions & 1 deletion src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
// addStats(resp, getNbrTime.elapsedInUSec());
vids_.clear();
// vids_.clear();
return handleResponse(std::move(resp));
});
}
Expand All @@ -71,6 +71,22 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
}
list.values.emplace_back(std::move(*dataset));
}

List vids(vids_);
DLOG(ERROR) << " step is " << currentStep_;
if (subgraph_->edgeFilter()) {
DLOG(ERROR) << "subgraph edge filter: " << subgraph_->edgeFilter()->toString();
} else {
DLOG(ERROR) << "subgraph edge filter is nullptr";
}
if (subgraph_->filter()) {
DLOG(ERROR) << "subgraph filter: " << subgraph_->filter()->toString();
} else {
DLOG(ERROR) << " subgraph filter is nullptr";
}
DLOG(ERROR) << "vids is " << vids.toString();
DLOG(ERROR) << "getNeightbor result: " << list.toString();

auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);

Expand All @@ -80,12 +96,36 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
}

if (!process(std::move(iter)) || ++currentStep_ > steps) {
filterEdges(0);
return folly::makeFuture<Status>(Status::OK());
} else {
return getNeighbors();
}
}

void SubgraphExecutor::filterEdges(int version) {
auto iter = ectx_->getVersionedResult(subgraph_->outputVar(), version).iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
while (gnIter->valid()) {
const auto& dst = gnIter->getEdgeProp("*", nebula::kDst);
if (validVids_.find(dst) == validVids_.end()) {
auto edge = gnIter->getEdge();
gnIter->erase();
DLOG(ERROR) << "erase dst " << dst.toString() << " edge is : " << edge.toString();
} else {
gnIter->next();
}
}
gnIter->reset();
ResultBuilder builder;
builder.iter(std::move(iter));
ectx_->setVersionedResult(subgraph_->outputVar(), builder.build(), version);

std::vector<Value> vids(validVids_.begin(), validVids_.end());
List list(vids);
DLOG(ERROR) << "pre Dst is : " << list;
}

bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
auto gnSize = iter->size();
if (gnSize == 0) {
Expand All @@ -105,9 +145,15 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
}
iter->reset();
}
vids_.clear();
auto& biDirectEdgeTypes = subgraph_->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
validVids_.emplace(iter->getColumn(0));
// if (currentStep_ != 1 && preDsts.find(dst) == preDsts.end()) {
// iter->erase();
// continue;
// }
auto findIter = historyVids_.find(dst);
if (findIter != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
Expand Down Expand Up @@ -147,6 +193,9 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
if (currentStep_ != 1) {
filterEdges(-1);
}
if (vids_.empty()) {
return false;
}
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsRespo
class SubgraphExecutor : public StorageAccessExecutor {
public:
using HashMap = robin_hood::unordered_flat_map<Value, size_t, std::hash<Value>>;
using HashSet = robin_hood::unordered_flat_set<Value, std::hash<Value>>;

SubgraphExecutor(const PlanNode* node, QueryContext* qctx)
: StorageAccessExecutor("SubgraphExecutor", node, qctx) {
Expand All @@ -57,6 +58,8 @@ class SubgraphExecutor : public StorageAccessExecutor {

bool process(std::unique_ptr<GetNeighborsIter> iter);

void filterEdges(int version);

folly::Future<Status> handleResponse(RpcResponse&& resps);

private:
Expand All @@ -65,6 +68,7 @@ class SubgraphExecutor : public StorageAccessExecutor {
size_t currentStep_{1};
size_t totalSteps_{1};
std::vector<Value> vids_;
HashSet validVids_;
};

} // namespace graph
Expand Down
18 changes: 2 additions & 16 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,13 @@ Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars
bool notEmpty = false;
for (const auto& type : colType) {
if (type == Value::Type::VERTEX) {
auto originVertices = gnIter->getVertices();
vertices.reserve(originVertices.size());
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
}
vertices.emplace_back(std::move(v));
}
vertices = gnIter->getVertices();
if (!vertices.empty()) {
notEmpty = true;
row.emplace_back(std::move(vertices));
}
} else {
auto originEdges = gnIter->getEdges();
edges.reserve(originEdges.size());
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
}
edges.emplace_back(std::move(edge));
}
edges = gnIter->getEdges();
if (!edges.empty()) {
notEmpty = true;
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Status GetNeighborsExecutor::handleResponse(RpcResponse& resps) {

list.values.emplace_back(std::move(*dataset));
}
DLOG(ERROR) << "getNeightbor result : " << list.toString();
builder.value(Value(std::move(list))).iter(Iterator::Kind::kGetNeighbors);
return finish(builder.build());
}
Expand Down

0 comments on commit beed8aa

Please sign in to comment.