Skip to content

Commit

Permalink
Refine the stats usage in Executor (#5554)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
yixinglu and Sophie-Xie authored May 18, 2023
1 parent 979f602 commit 483a31a
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 42 deletions.
22 changes: 18 additions & 4 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <folly/futures/Future.h>

#include <boost/core/noncopyable.hpp>
#include <mutex>

#include "common/cpp/helpers.h"
#include "common/memory/MemoryTracker.h"
Expand Down Expand Up @@ -123,11 +124,21 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
class GatherFunc>
auto runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter);

void addState(const std::string &name, size_t durationInUs) {
otherStats_.emplace(name, folly::sformat("{}(us)", durationInUs));
void addState(const std::string &name, const time::Duration &dur) {
auto str = folly::sformat("{}(us)", dur.elapsedInUSec());
std::lock_guard<std::mutex> l(statsLock_);
otherStats_.emplace(name, std::move(str));
}
void addState(const std::string &name, folly::dynamic json) {
otherStats_.emplace(name, folly::toPrettyJson(json));

void addState(const std::string &name, const folly::dynamic &json) {
auto str = folly::toPrettyJson(json);
std::lock_guard<std::mutex> l(statsLock_);
otherStats_.emplace(name, std::move(str));
}

void mergeStats(std::unordered_map<std::string, std::string> stats) {
std::lock_guard<std::mutex> l(statsLock_);
otherStats_.merge(std::move(stats));
}

int64_t id_;
Expand All @@ -150,6 +161,9 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
uint64_t numRows_{0};
uint64_t execTime_{0};
time::Duration totalDuration_;

private:
std::mutex statsLock_;
std::unordered_map<std::string, std::string> otherStats_;
};

Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ void StorageAccessExecutor::addGetNeighborStats(RpcResponse &resp, size_t stepNu
}

auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum);
std::lock_guard<folly::SpinLock> lk(statsLock_);
otherStats_.emplace(key, folly::toPrettyJson(stats));
addState(key, stats);
}

} // namespace graph
Expand Down
3 changes: 0 additions & 3 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ class StorageAccessExecutor : public Executor {
const std::vector<VertexProp> *vertexPropPtr);

std::vector<Value> handlePropResp(PropRpcResponse &&resps);

protected:
folly::SpinLock statsLock_;
};

} // namespace graph
Expand Down
13 changes: 3 additions & 10 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ folly::Future<Status> AllPathsExecutor::getNeighbors(bool reverse) {
time::Duration buildAdjTime;
auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step);
expand(iter.get(), reverse);
{
std::lock_guard<folly::SpinLock> lk(statsLock_);
otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec()));
}
addState(key, buildAdjTime);
return Status::OK();
});
}
Expand Down Expand Up @@ -212,10 +209,7 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
time::Duration buildPathTime;
auto future = buildPathMultiJobs();
return future.via(runner())
.ensure([this, buildPathTime]() {
otherStats_.emplace("build_path_time",
folly::sformat("{}(us)", buildPathTime.elapsedInUSec()));
})
.ensure([this, buildPathTime]() { addState("build_path_time", buildPathTime); })
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!withProp_ || emptyPropVids_.empty()) {
Expand Down Expand Up @@ -254,8 +248,7 @@ folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
})
.thenValue([this, conjunctPathTime](auto&& resps) {
NG_RETURN_IF_ERROR(resps);
otherStats_.emplace("conjunct_path_time",
folly::sformat("{}(us)", conjunctPathTime.elapsedInUSec()));
addState("conjunct_path_time", conjunctPathTime);
return Status::OK();
});
}
Expand Down
6 changes: 4 additions & 2 deletions src/graph/executor/algo/ShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ folly::Future<Status> ShortestPathExecutor::execute() {
HashSet endVids;
size_t rowSize = checkInput(startVids, endVids);
std::unique_ptr<ShortestPathBase> pathPtr = nullptr;
std::unordered_map<std::string, std::string> stats;
if (rowSize <= FLAGS_num_path_thread) {
pathPtr = std::make_unique<SingleShortestPath>(pathNode_, qctx_, &otherStats_);
pathPtr = std::make_unique<SingleShortestPath>(pathNode_, qctx_, &stats);
} else {
pathPtr = std::make_unique<BatchShortestPath>(pathNode_, qctx_, &otherStats_);
pathPtr = std::make_unique<BatchShortestPath>(pathNode_, qctx_, &stats);
}
auto status = pathPtr->execute(startVids, endVids, &result).get();
NG_RETURN_IF_ERROR(status);
mergeStats(std::move(stats));
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
addState("total_rpc_time", getNbrTime);
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
Expand All @@ -64,7 +64,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
size = (*result.vertices_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
addState(folly::sformat("resp[{}]", i), info);
}
vids_.clear();
return handleResponse(std::move(resp));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addState("total_rpc", getPropsTime.elapsedInUSec());
addState("total_rpc", getPropsTime);
addStats(rpcResp);
if (FLAGS_max_job_size <= 1) {
return folly::makeFuture<Status>(handleResp(std::move(rpcResp)));
Expand Down
9 changes: 4 additions & 5 deletions src/graph/executor/query/ExpandAllExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ folly::Future<Status> ExpandAllExecutor::GetDstBySrc() {
.via(runner())
.ensure([this, getDstTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec()));
addState("total_rpc_time", getDstTime);
})
.thenValue([this](StorageRpcResponse<GetDstBySrcResponse>&& resps) {
memory::MemoryCheckGuard guard;
Expand All @@ -95,8 +95,7 @@ folly::Future<Status> ExpandAllExecutor::GetDstBySrc() {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
addState(folly::sformat("step{} resp [{}]", currentStep_, i), info);
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
Expand Down Expand Up @@ -172,7 +171,7 @@ folly::Future<Status> ExpandAllExecutor::getNeighbors() {
: stepLimits_[currentStep_ - 2];
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
std::string timeName = "graphExpandAllTime+" + folly::to<std::string>(currentStep_);
otherStats_.emplace(timeName, folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState(timeName, expandTime);
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
Expand Down Expand Up @@ -219,7 +218,7 @@ folly::Future<Status> ExpandAllExecutor::expandFromCache() {
preVisitedVids_.swap(visitedVids);
preDst2VidsMap_.swap(dst2VidsMap);
std::string timeName = "graphCacheExpandAllTime+" + folly::to<std::string>(currentStep_);
otherStats_.emplace(timeName, folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState(timeName, expandTime);
if (!nextStepVids_.empty()) {
return getNeighbors();
}
Expand Down
9 changes: 4 additions & 5 deletions src/graph/executor/query/ExpandExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ folly::Future<Status> ExpandExecutor::GetDstBySrc() {
.via(runner())
.ensure([this, getDstTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec()));
addState("total_rpc_time", getDstTime);
})
.thenValue([this](StorageRpcResponse<GetDstBySrcResponse>&& resps) {
memory::MemoryCheckGuard guard;
Expand All @@ -94,8 +94,7 @@ folly::Future<Status> ExpandExecutor::GetDstBySrc() {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
addState(folly::sformat("step{} resp [{}]", currentStep_, i), info);
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
Expand Down Expand Up @@ -171,7 +170,7 @@ folly::Future<Status> ExpandExecutor::getNeighbors() {
: stepLimits_[currentStep_ - 1];
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
std::string timeName = "graphExpandTime+" + folly::to<std::string>(currentStep_);
otherStats_.emplace(timeName, folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState(timeName, expandTime);
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
Expand Down Expand Up @@ -218,7 +217,7 @@ folly::Future<Status> ExpandExecutor::expandFromCache() {
dst2VidsMap.swap(preDst2VidsMap_);
visitedVids.swap(preVisitedVids_);
std::string timeName = "graphCacheExpandTime+" + folly::to<std::string>(currentStep_);
otherStats_.emplace(timeName, folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState(timeName, expandTime);
if (!nextStepVids_.empty()) {
return getNeighbors();
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
.via(runner())
.ensure([this, getPropsTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec()));
addState("total_rpc", getPropsTime);
})
.thenValue([this, ge](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
.via(runner())
.ensure([this, getNbrTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
addState("total_rpc_time", getNbrTime);
})
.thenValue([this](StorageRpcResponse<GetNeighborsResponse>&& resp) {
memory::MemoryCheckGuard guard;
Expand All @@ -72,7 +72,7 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
size = (*result.vertices_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
addState(folly::sformat("resp[{}]", i), info);
}
return handleResponse(resp);
});
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
.via(runner())
.ensure([this, getPropsTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec()));
addState("total_rpc", getPropsTime);
})
.thenValue([this, gv](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ folly::Future<Status> ScanEdgesExecutor::scanEdges() {
.via(runner())
.ensure([this, scanEdgesTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanEdgesTime.elapsedInUSec()));
addState("total_rpc", scanEdgesTime);
})
.thenValue([this](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ folly::Future<Status> ScanVerticesExecutor::scanVertices() {
.via(runner())
.ensure([this, scanVertexTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanVertexTime.elapsedInUSec()));
addState("total_rpc", scanVertexTime);
})
.thenValue([this, sv](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
addStats(resp, getNbrTime.elapsedInUSec());
time::Duration expandTime;
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
addState("expandTime", expandTime.elapsedInUSec());
addState("expandTime", expandTime);
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
Expand Down Expand Up @@ -245,7 +245,7 @@ folly::Future<Status> TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps)
}
}

addState("expandPostTaskTime", postTaskTime.elapsedInUSec());
addState("expandPostTaskTime", postTaskTime);
folly::dynamic taskRunTimeArray = folly::dynamic::array();
for (auto time : *taskRunTime) {
taskRunTimeArray.push_back(time);
Expand Down

0 comments on commit 483a31a

Please sign in to comment.