Skip to content

Commit

Permalink
address yixinglu's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Aug 26, 2021
1 parent cc6370e commit eb8b3d6
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 100 deletions.
23 changes: 11 additions & 12 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ bool MetaClient::loadData() {
}

if (!loadSessions()) {
LOG(ERROR) << "Load roles Failed";
LOG(ERROR) << "Load sessions Failed";
return false;
}

Expand Down Expand Up @@ -3487,23 +3487,22 @@ bool MetaClient::loadSessions() {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* old_session_map = sessionMap_.load();
SessionMap* new_session_map = new SessionMap(*old_session_map);
folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>* old_killed_plan = killedPlans_.load();
folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>* new_killed_plan =
new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*old_killed_plan);
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
for (auto& session : session_list.value().get_sessions()) {
(*new_session_map)[session.get_session_id()] = session;
(*newSessionMap)[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
new_killed_plan->insert({session.get_session_id(), query.first});
newKilledPlan->insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(new_session_map);
killedPlans_.store(new_killed_plan);
folly::rcu_retire(old_killed_plan);
folly::rcu_retire(old_session_map);
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

Expand Down
32 changes: 24 additions & 8 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,30 @@ struct PropContext;
// PlanContext stores information **unchanged** during the process.
// All processor won't change them after request is parsed.
class PlanContext {
using ReqCommonRef = ::apache::thrift::optional_field_ref<const cpp2::RequestCommon&>;

public:
PlanContext(StorageEnv* env,
GraphSpaceID spaceId,
SessionID sessionId,
ExecutionPlanID planId,
size_t vIdLen,
bool isIntId)
PlanContext(StorageEnv* env, GraphSpaceID spaceId, size_t vIdLen, bool isIntId)
: env_(env),
spaceId_(spaceId),
sessionId_(sessionId),
planId_(planId),
sessionId_(0),
planId_(0),
vIdLen_(vIdLen),
isIntId_(isIntId) {}
PlanContext(
StorageEnv* env, GraphSpaceID spaceId, size_t vIdLen, bool isIntId, ReqCommonRef commonRef)
: env_(env),
spaceId_(spaceId),
sessionId_(0),
planId_(0),
vIdLen_(vIdLen),
isIntId_(isIntId) {
if (commonRef.has_value()) {
auto& common = commonRef.value();
sessionId_ = common.session_id_ref().value_or(0);
planId_ = common.plan_id_ref().value_or(0);
}
}

StorageEnv* env_;
GraphSpaceID spaceId_;
Expand Down Expand Up @@ -184,6 +195,11 @@ struct RuntimeContext {

ObjectPool* objPool() { return &planContext_->objPool_; }

bool isPlanKilled() {
return env()->metaClient_ &&
env()->metaClient_->checkIsPlanKilled(planContext_->sessionId_, planContext_->planId_);
}

PlanContext* planContext_;
TagID tagId_ = 0;
std::string tagName_ = "";
Expand Down
8 changes: 2 additions & 6 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ class GetNeighborsNode : public QueryNode<VertexID> {
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (context_->resultStat_ == ResultStatus::ILLEGAL_DATA) {
Expand Down Expand Up @@ -94,9 +92,7 @@ class GetNeighborsNode : public QueryNode<VertexID> {
int64_t edgeRowCount = 0;
nebula::List list;
for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (edgeRowCount >= limit_) {
Expand Down
8 changes: 2 additions & 6 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ class HashJoinNode : public IterateNode<VertexID> {

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
Expand Down Expand Up @@ -98,9 +96,7 @@ class HashJoinNode : public IterateNode<VertexID> {

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
iters.emplace_back(edgeNode->iter());
Expand Down
4 changes: 1 addition & 3 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class IndexEdgeNode final : public RelNode<T> {
std::vector<storage::cpp2::EdgeKey> edges;
auto* iter = static_cast<EdgeIndexIterator*>(indexScanNode_->iterator());
while (iter && iter->valid()) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (!iter->val().empty() && ttlProp.first) {
Expand Down
4 changes: 1 addition & 3 deletions src/storage/exec/IndexFilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ class IndexFilterNode final : public RelNode<T> {
data = indexVertexNode_->moveData();
}
for (const auto& k : data) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (evalExprByIndex_) {
Expand Down
4 changes: 1 addition & 3 deletions src/storage/exec/IndexOutputNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ class IndexOutputNode final : public RelNode<T> {

private:
nebula::cpp2::ErrorCode collectResult(const std::vector<kvstore::KV>& data) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
auto ret = nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
5 changes: 1 addition & 4 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ class IndexScanNode : public RelNode<T> {
auto ttlProp = CommonUtils::ttlProps(sh);
data_.clear();
while (!!iter_ && iter_->valid()) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
context_->planContext_->isKilled_ = true;
if (context_->isPlanKilled()) {
return {};
}
if (!iter_->val().empty() && ttlProp.first) {
Expand Down
4 changes: 1 addition & 3 deletions src/storage/exec/IndexVertexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class IndexVertexNode final : public RelNode<T> {
std::vector<VertexID> vids;
auto* iter = static_cast<VertexIndexIterator*>(indexScanNode_->iterator());
while (iter && iter->valid()) {
if (context_->env()->metaClient_ &&
context_->env()->metaClient_->checkIsPlanKilled(context_->planContext_->sessionId_,
context_->planContext_->planId_)) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (!iter->val().empty() && ttlProp.first) {
Expand Down
10 changes: 1 addition & 9 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
return retCode;
}
auto session_id =
req.common_ref().has_value() && req.common_ref().value().session_id_ref().has_value()
? *req.get_common()->get_session_id()
: 0;

auto plan_id = req.common_ref().has_value() && req.common_ref().value().plan_id_ref().has_value()
? *req.get_common()->get_plan_id()
: 0;
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, session_id, plan_id, this->spaceVidLen_, this->isIntId_);
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());
const auto& indices = req.get_indices();
this->planContext_->isEdge_ = indices.get_is_edge();
this->context_ = std::make_unique<RuntimeContext>(this->planContext_.get());
Expand Down
13 changes: 2 additions & 11 deletions src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,8 @@ void UpdateEdgeProcessor::doProcess(const cpp2::UpdateEdgeRequest& req) {
onFinished();
return;
}
auto session_id =
req.common_ref().has_value() && req.common_ref().value().session_id_ref().has_value()
? *req.get_common()->get_session_id()
: 0;

auto plan_id = req.common_ref().has_value() && req.common_ref().value().plan_id_ref().has_value()
? *req.get_common()->get_plan_id()
: 0;
planContext_ =
std::make_unique<PlanContext>(env_, spaceId_, session_id, plan_id, spaceVidLen_, isIntId_);
context_ = std::make_unique<RuntimeContext>(planContext_.get());
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());
if (env_->txnMan_ && env_->txnMan_->enableToss(spaceId_)) {
planContext_->defaultEdgeVer_ = 1L;
}
Expand Down
13 changes: 2 additions & 11 deletions src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,8 @@ void UpdateVertexProcessor::doProcess(const cpp2::UpdateVertexRequest& req) {
onFinished();
return;
}
auto session_id =
req.common_ref().has_value() && req.common_ref().value().session_id_ref().has_value()
? *req.get_common()->get_session_id()
: 0;

auto plan_id = req.common_ref().has_value() && req.common_ref().value().plan_id_ref().has_value()
? *req.get_common()->get_plan_id()
: 0;
planContext_ =
std::make_unique<PlanContext>(env_, spaceId_, session_id, plan_id, spaceVidLen_, isIntId_);
context_ = std::make_unique<RuntimeContext>(planContext_.get());
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());

retCode = checkAndBuildContexts(req);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
12 changes: 2 additions & 10 deletions src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,8 @@ void GetNeighborsProcessor::doProcess(const cpp2::GetNeighborsRequest& req) {
onFinished();
return;
}
auto session_id =
req.common_ref().has_value() && req.common_ref().value().session_id_ref().has_value()
? *req.get_common()->get_session_id()
: 0;

auto plan_id = req.common_ref().has_value() && req.common_ref().value().plan_id_ref().has_value()
? *req.get_common()->get_plan_id()
: 0;
planContext_ =
std::make_unique<PlanContext>(env_, spaceId_, session_id, plan_id, spaceVidLen_, isIntId_);
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());

// build TagContext and EdgeContext
retCode = checkAndBuildContexts(req);
Expand Down
12 changes: 2 additions & 10 deletions src/storage/query/GetPropProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,8 @@ void GetPropProcessor::doProcess(const cpp2::GetPropRequest& req) {
onFinished();
return;
}
auto session_id =
req.common_ref().has_value() && req.common_ref().value().session_id_ref().has_value()
? *req.get_common()->get_session_id()
: 0;

auto plan_id = req.common_ref().has_value() && req.common_ref().value().plan_id_ref().has_value()
? *req.get_common()->get_plan_id()
: 0;
planContext_ =
std::make_unique<PlanContext>(env_, spaceId_, session_id, plan_id, spaceVidLen_, isIntId_);
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());

retCode = checkAndBuildContexts(req);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/GetNeighborsBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void initContext(std::unique_ptr<nebula::storage::PlanContext>& planCtx,
nebula::GraphSpaceID spaceId = 1;
auto* env = gCluster->storageEnv_.get();
auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId).value();
planCtx = std::make_unique<nebula::storage::PlanContext>(env, spaceId, 0, 0, vIdLen, false);
planCtx = std::make_unique<nebula::storage::PlanContext>(env, spaceId, vIdLen, false);
context = std::make_unique<nebula::storage::RuntimeContext>(planCtx.get());

nebula::EdgeType serve = 101;
Expand Down

0 comments on commit eb8b3d6

Please sign in to comment.