Skip to content

Commit

Permalink
machine not registed should response leader address (#3950) (#4020)
Browse files Browse the repository at this point in the history
Conflicts:
	src/meta/processors/BaseProcessor.h
  • Loading branch information
darionyaphet authored Mar 14, 2022
1 parent d523376 commit db575a8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
7 changes: 4 additions & 3 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,12 @@ void MetaClient::getResponse(Request req,
}

auto&& resp = t.value();
if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
auto code = resp.get_code();
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
// succeeded
pro.setValue(respGen(std::move(resp)));
return;
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
} else if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
updateLeader(resp.get_leader());
if (retry < retryLimit) {
evb->runAfterDelay(
Expand All @@ -768,7 +769,7 @@ void MetaClient::getResponse(Request req,
FLAGS_meta_client_retry_interval_secs * 1000);
return;
}
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE) {
} else if (code == nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE) {
pro.setValue(respGen(std::move(resp)));
return;
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND) {
Expand Down
19 changes: 13 additions & 6 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,24 @@ class BaseProcessor {
delete this;
}

void handleErrorCode(nebula::cpp2::ErrorCode code,
GraphSpaceID spaceId = kDefaultSpaceId,
PartitionID partId = kDefaultPartId) {
/**
* @brief Set error code and handle leader changed.
*
* @param code
*/
void handleErrorCode(nebula::cpp2::ErrorCode code) {
resp_.code_ref() = code;
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
handleLeaderChanged(spaceId, partId);
handleLeaderChanged();
}
}

void handleLeaderChanged(GraphSpaceID spaceId, PartitionID partId) {
auto leaderRet = kvstore_->partLeader(spaceId, partId);
/**
* @brief Set leader address to reponse.
*
*/
void handleLeaderChanged() {
auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId);
if (ok(leaderRet)) {
resp_.leader_ref() = toThriftHost(nebula::value(leaderRet));
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
if (!ActiveHostsMan::machineRegisted(kvstore_, host)) {
LOG(ERROR) << "Machine " << host << " is not registed";
handleErrorCode(nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND);
setLeaderInfo();
onFinished();
return;
}
Expand Down Expand Up @@ -122,5 +123,14 @@ void HBProcessor::process(const cpp2::HBReq& req) {
onFinished();
}

void HBProcessor::setLeaderInfo() {
auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId);
if (ok(leaderRet)) {
resp_.leader_ref() = toThriftHost(nebula::value(leaderRet));
} else {
resp_.code_ref() = nebula::error(leaderRet);
}
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/meta/processors/admin/HBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class HBProcessor : public BaseProcessor<cpp2::HBResp> {
HBProcessor(kvstore::KVStore* kvstore, const HBCounters* counters, ClusterID clusterId)
: BaseProcessor<cpp2::HBResp>(kvstore), clusterId_(clusterId), counters_(counters) {}

void setLeaderInfo();

ClusterID clusterId_{0};
const HBCounters* counters_{nullptr};
static std::atomic<int64_t> metaVersion_;
Expand Down

0 comments on commit db575a8

Please sign in to comment.