Skip to content

Commit

Permalink
fix multi-thread error
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Sep 30, 2022
1 parent d22ce2f commit 54b4043
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 27 deletions.
44 changes: 18 additions & 26 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,24 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
currentStep_ == 1 ? nullptr : subgraph_->tagFilter())
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
// addStats(resp, getNbrTime.elapsedInUSec());
// vids_.clear();
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
}
vids_.clear();
return handleResponse(std::move(resp));
});
}
Expand All @@ -73,21 +89,6 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
list.values.emplace_back(std::move(*dataset));
}

List vids(vids_);
DLOG(ERROR) << " step is " << currentStep_;
if (subgraph_->edgeFilter()) {
DLOG(ERROR) << "subgraph edge filter: " << subgraph_->edgeFilter()->toString();
} else {
DLOG(ERROR) << "subgraph edge filter is nullptr";
}
if (subgraph_->filter()) {
DLOG(ERROR) << "subgraph filter: " << subgraph_->filter()->toString();
} else {
DLOG(ERROR) << " subgraph filter is nullptr";
}
DLOG(ERROR) << "vids is " << vids.toString();
DLOG(ERROR) << "getNeightbor result: " << list.toString();

auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);

Expand All @@ -112,7 +113,6 @@ void SubgraphExecutor::filterEdges(int version) {
if (validVids_.find(dst) == validVids_.end()) {
auto edge = gnIter->getEdge();
gnIter->erase();
DLOG(ERROR) << "erase dst " << dst.toString() << " edge is : " << edge.toString();
} else {
gnIter->next();
}
Expand All @@ -121,10 +121,6 @@ void SubgraphExecutor::filterEdges(int version) {
ResultBuilder builder;
builder.iter(std::move(iter));
ectx_->setVersionedResult(subgraph_->outputVar(), builder.build(), version);

std::vector<Value> vids(validVids_.begin(), validVids_.end());
List list(vids);
DLOG(ERROR) << "pre Dst is : " << list;
}

bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
Expand All @@ -147,10 +143,6 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
}
validVids_.insert(std::make_move_iterator(startVids.begin()),
std::make_move_iterator(startVids.end()));
for (auto vid : validVids_) {
DLOG(ERROR) << "valid vid : " << vid.toString();
}
vids_.clear();
auto& biDirectEdgeTypes = subgraph_->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SubgraphExecutor : public StorageAccessExecutor {
size_t currentStep_{1};
size_t totalSteps_{1};
std::vector<Value> vids_;
// save vids already visited
HashSet validVids_;
};

Expand Down
2 changes: 2 additions & 0 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class GetNeighborsNode : public QueryNode<VertexID> {
if (context_->resultStat_ == ResultStatus::TAG_FILTER_OUT) {
// if the filter condition of the tag is not satisfied
// do not return the data for this vertex and corresponding edge
// todo (need lock when multi-thread?)
context_->resultStat_ = ResultStatus::NORMAL;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down
1 change: 0 additions & 1 deletion tests/tck/features/subgraph/subgraphWithFilter.feature
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: subgraph with fitler

Background:
Expand Down

0 comments on commit 54b4043

Please sign in to comment.