Skip to content

Commit

Permalink
Remove the lock from collectResponse in storage client (#5338)
Browse files Browse the repository at this point in the history
* Remove the lock from collectResp in storage client

* Address the comment
  • Loading branch information
yixinglu authored Feb 14, 2023
1 parent c20467f commit 0f11fa1
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 74 deletions.
2 changes: 2 additions & 0 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
namespace nebula {
namespace storage {

constexpr int32_t kInternalPortOffset = -2;

template <typename T>
::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
if (!tryResp.hasValue()) {
Expand Down
12 changes: 6 additions & 6 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,20 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
memory::MemoryCheckGuard guard;
StorageRpcResponse<Response> rpcResp(resps.size());
for (size_t i = 0; i < resps.size(); i++) {
auto& host = hosts->at(i);
const auto& host = hosts->at(i);
folly::Try<StatusOr<Response>>& tryResp = resps[i];
if (tryResp.hasException()) {
std::string errMsg = tryResp.exception().what().toStdString();
rpcResp.markFailure();
LOG(ERROR) << "There some RPC errors: " << errMsg;
auto req = requests.at(host);
auto parts = getReqPartsId(req);
const auto& req = requests.at(host);
const auto& parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
} else {
StatusOr<Response> status = std::move(tryResp).value();
if (status.ok()) {
auto resp = std::move(status).value();
auto result = resp.get_result();
const auto& result = resp.get_result();

if (!result.get_failed_parts().empty()) {
rpcResp.markFailure();
Expand All @@ -141,8 +141,8 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED
: nebula::cpp2::ErrorCode::E_RPC_FAILURE;
LOG(ERROR) << "There some RPC errors: " << s.message();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
const auto& req = requests.at(host);
const auto& parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, errorCode);
}
}
Expand Down
19 changes: 2 additions & 17 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
DECLARE_int32(storage_client_timeout_ms);
DECLARE_uint32(storage_client_retry_interval_ms);

constexpr int32_t kInternalPortOffset = -2;

namespace nebula {
namespace storage {

Expand All @@ -34,81 +32,68 @@ class StorageRpcResponse final {
};

explicit StorageRpcResponse(size_t reqsSent) : totalReqsSent_(reqsSent) {
lock_ = std::make_unique<std::mutex>();
responses_.reserve(reqsSent);
}

bool succeeded() const {
std::lock_guard<std::mutex> g(*lock_);
return result_ == Result::ALL_SUCCEEDED;
}

int32_t maxLatency() const {
std::lock_guard<std::mutex> g(*lock_);
return maxLatency_;
}

void setLatency(HostAddr host, int32_t latency, int32_t e2eLatency) {
std::lock_guard<std::mutex> g(*lock_);
void setLatency(const HostAddr& host, int32_t latency, int32_t e2eLatency) {
if (latency > maxLatency_) {
maxLatency_ = latency;
}
hostLatency_.emplace_back(std::make_tuple(host, latency, e2eLatency));
}

void markFailure() {
std::lock_guard<std::mutex> g(*lock_);
result_ = Result::PARTIAL_SUCCEEDED;
++failedReqs_;
}

// A value between [0, 100], representing a percentage
int32_t completeness() const {
std::lock_guard<std::mutex> g(*lock_);
DCHECK_NE(totalReqsSent_, 0);
return totalReqsSent_ == 0 ? 0 : (totalReqsSent_ - failedReqs_) * 100 / totalReqsSent_;
}

void emplaceFailedPart(PartitionID partId, nebula::cpp2::ErrorCode errorCode) {
std::lock_guard<std::mutex> g(*lock_);
failedParts_.emplace(partId, errorCode);
}

void appendFailedParts(const std::vector<PartitionID>& partsId,
nebula::cpp2::ErrorCode errorCode) {
std::lock_guard<std::mutex> g(*lock_);
failedParts_.reserve(failedParts_.size() + partsId.size());
for (const auto& partId : partsId) {
failedParts_.emplace(partId, errorCode);
}
}

void addResponse(Response&& resp) {
std::lock_guard<std::mutex> g(*lock_);
responses_.emplace_back(std::move(resp));
}

// Not thread-safe.
const std::unordered_map<PartitionID, nebula::cpp2::ErrorCode>& failedParts() const {
return failedParts_;
}

// Not thread-safe.
std::vector<Response>& responses() {
return responses_;
}

// Not thread-safe.
const std::vector<Response>& responses() const {
return responses_;
}

// Not thread-safe.
const std::vector<std::tuple<HostAddr, int32_t, int32_t>>& hostLatency() const {
return hostLatency_;
}

private:
std::unique_ptr<std::mutex> lock_;
const size_t totalReqsSent_;
size_t failedReqs_{0};

Expand Down
21 changes: 8 additions & 13 deletions src/graph/context/iterator/PropIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,13 @@ const Value& PropIter::getProp(const std::string& name, const std::string& prop)
if (name == "*") {
for (auto& index : propsMap) {
auto propIndex = index.second.find(prop);
if (propIndex == index.second.end()) {
continue;
}
colId = propIndex->second;
DCHECK_GT(row.size(), colId);
auto& val = row[colId];
if (val.empty()) {
continue;
} else {
return val;
if (propIndex != index.second.end()) {
colId = propIndex->second;
DCHECK_GT(row.size(), colId);
auto& val = row[colId];
if (!val.empty()) {
return val;
}
}
}
return Value::kNullValue;
Expand Down Expand Up @@ -153,9 +150,7 @@ Value PropIter::getVertex(const std::string& name) {
Tag tag;
tag.name = tagProp.first;
for (auto& propIndex : tagProp.second) {
if (propIndex.first == nebula::kTag) { // "_tag"
continue;
} else {
if (propIndex.first != nebula::kTag) { // "_tag"
tag.props.emplace(propIndex.first, row[propIndex.second]);
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
class GatherFunc>
auto runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter);

void addState(const std::string &name, size_t durationInUs) {
otherStats_.emplace(name, folly::sformat("{}(us)", durationInUs));
}
void addState(const std::string &name, folly::dynamic json) {
otherStats_.emplace(name, folly::toPrettyJson(json));
}

int64_t id_;

// Executor name
Expand Down
5 changes: 2 additions & 3 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,11 @@ class StorageAccessExecutor : public Executor {
}

template <typename RESP>
void addStats(storage::StorageRpcResponse<RESP> &resp,
std::unordered_map<std::string, std::string> &stats) const {
void addStats(storage::StorageRpcResponse<RESP> &resp) {
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto info = util::collectRespProfileData(resp.responses()[i].get_result(), hostLatency[i]);
stats.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
addState(folly::sformat("resp[{}]", i), std::move(info));
}
}

Expand Down
44 changes: 19 additions & 25 deletions src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetPropResponse;

DECLARE_bool(optimize_appendvertices);

namespace nebula {
namespace graph {

folly::Future<Status> AppendVerticesExecutor::execute() {
return appendVertices();
}
Expand Down Expand Up @@ -56,15 +59,12 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
av->limit(qctx()),
av->filter())
.via(runner())
.ensure([this, getPropsTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec()));
})
.thenValue([this](StorageRpcResponse<GetPropResponse> &&rpcResp) {
.thenValue([this, getPropsTime](StorageRpcResponse<GetPropResponse> &&rpcResp) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addState("total_rpc", getPropsTime.elapsedInUSec());
addStats(rpcResp);
if (FLAGS_max_job_size <= 1) {
return folly::makeFuture<Status>(handleResp(std::move(rpcResp)));
} else {
Expand Down Expand Up @@ -150,12 +150,11 @@ Status AppendVerticesExecutor::handleResp(
bool mv = movable(av->inputVars().front());
for (; inputIter->valid(); inputIter->next()) {
auto dstFound = map.find(src->eval(ctx(inputIter.get())));
if (dstFound == map.end()) {
continue;
if (dstFound != map.end()) {
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
return finish(ResultBuilder().value(Value(std::move(ds))).state(state).build());
}
Expand All @@ -175,9 +174,7 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
if (resp.props_ref().has_value()) {
auto &&respV = std::move(*resp.props_ref());
v.colNames = respV.colNames;
v.rows.insert(v.rows.end(),
std::make_move_iterator(respV.begin()),
std::make_move_iterator(respV.end()));
std::move(respV.begin(), respV.end(), std::back_inserter(v.rows));
}
}
auto propIter = PropIter(std::make_shared<Value>(std::move(v)));
Expand All @@ -191,10 +188,9 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
auto gather = [this](auto &&results) -> Status {
memory::MemoryCheckGuard guard;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
result_.rows.insert(result_.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
};
Expand All @@ -219,10 +215,9 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
auto gatherFinal = [this](auto &&results) -> Status {
memory::MemoryCheckGuard guard2;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
result_.rows.insert(result_.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
};
Expand Down Expand Up @@ -280,12 +275,11 @@ DataSet AppendVerticesExecutor::handleJob(size_t begin, size_t end, Iterator *it
QueryExpressionContext ctx(qctx()->ectx());
for (; iter->valid() && begin++ < end; iter->next()) {
auto dstFound = dsts_.find(src->eval(ctx(iter)));
if (dstFound == dsts_.end()) {
continue;
if (dstFound != dsts_.end()) {
Row row = *iter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Row row = *iter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}

return ds;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
.thenValue([this, ge](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), ge->colNames());
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
.thenValue([this, gv](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), gv->colNames());
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp));
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ folly::Future<Status> ScanEdgesExecutor::scanEdges() {
.thenValue([this](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), {});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ folly::Future<Status> ScanVerticesExecutor::scanVertices() {
.thenValue([this, sv](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), sv->colNames());
});
}
Expand Down
9 changes: 4 additions & 5 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
addStats(resp, getNbrTime.elapsedInUSec());
time::Duration expandTime;
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState("expandTime", expandTime.elapsedInUSec());
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
Expand Down Expand Up @@ -149,7 +149,7 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) {
folly::dynamic stepObj = folly::dynamic::object();
stepObj.insert("storage", stepInfo);
stepObj.insert("total_rpc_time", folly::sformat("{}(us)", getNbrTimeInUSec));
otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepObj));
addState(folly::sformat("step[{}]", currentStep_), std::move(stepObj));
}

size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const {
Expand Down Expand Up @@ -240,13 +240,12 @@ folly::Future<Status> TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps)
}
}

auto t = postTaskTime.elapsedInUSec();
otherStats_.emplace("expandPostTaskTime", folly::sformat("{}(us)", t));
addState("expandPostTaskTime", postTaskTime.elapsedInUSec());
folly::dynamic taskRunTimeArray = folly::dynamic::array();
for (auto time : *taskRunTime) {
taskRunTimeArray.push_back(time);
}
otherStats_.emplace("expandTaskRunTime", folly::toPrettyJson(taskRunTimeArray));
addState("expandTaskRunTime", std::move(taskRunTimeArray));

return Status::OK();
});
Expand Down

0 comments on commit 0f11fa1

Please sign in to comment.