Skip to content

Commit

Permalink
Fix incompatibility imported by #4116 (#4165)
Browse files Browse the repository at this point in the history
* Add SaveGraphVersionProcessor to separate client version check and version saving

* Update error code

* Update error code
  • Loading branch information
Aiee authored Apr 18, 2022
1 parent 26f8fc8 commit d6df28c
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 9 deletions.
33 changes: 33 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,21 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) {
LOG(ERROR) << "Connect to the MetaServer Failed";
return false;
}

// Verify the graph server version
auto status = verifyVersion();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

// Save graph version to meta
status = saveVersionToMeta();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

CHECK(bgThread_->start());
LOG(INFO) << "Register time task for heartbeat!";
size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900);
Expand Down Expand Up @@ -3611,5 +3620,29 @@ Status MetaClient::verifyVersion() {
return Status::OK();
}

Status MetaClient::saveVersionToMeta() {
auto req = cpp2::SaveGraphVersionReq();
req.build_version_ref() = getOriginVersion();
req.host_ref() = options_.localHost_;
folly::Promise<StatusOr<cpp2::SaveGraphVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_saveGraphVersion(request); },
[](cpp2::SaveGraphVersionResp&& resp) { return std::move(resp); },
std::move(promise));

auto respStatus = std::move(future).get();
if (!respStatus.ok()) {
return respStatus.status();
}
auto resp = std::move(respStatus).value();
if (resp.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Failed to save graph version into meta, error code: %s",
apache::thrift::util::enumNameSafe(resp.get_code()).c_str());
}
return Status::OK();
}

} // namespace meta
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,14 @@ class MetaClient : public BaseMetaClient {

ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache);

// Checks if the the client version is compatible with the server version by checking the
// whilelist in meta.
Status verifyVersion();

// Save the version of the graph service into meta so that it could be looked up.
// This method should be only called in the internal client.
Status saveVersionToMeta();

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;
Expand Down
18 changes: 16 additions & 2 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,29 @@ struct GetMetaDirInfoReq {
struct VerifyClientVersionResp {
1: common.ErrorCode code,
2: common.HostAddr leader,
3: optional binary error_msg;
3: optional binary error_msg;
}


struct VerifyClientVersionReq {
1: required binary client_version = common.version;
2: common.HostAddr host;
3: binary build_version;
}

struct SaveGraphVersionResp {
1: common.ErrorCode code,
2: common.HostAddr leader,
3: optional binary error_msg;
}

// SaveGraphVersionReq is used to save the graph version of a graph service.
// This is for internal using only.
struct SaveGraphVersionReq {
1: required binary client_version = common.version;
2: common.HostAddr host;
3: binary build_version;
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand Down Expand Up @@ -1263,6 +1276,7 @@ service MetaService {
GetMetaDirInfoResp getMetaDirInfo(1: GetMetaDirInfoReq req);

VerifyClientVersionResp verifyClientVersion(1: VerifyClientVersionReq req)
SaveGraphVersionResp saveGraphVersion(1: SaveGraphVersionReq req)

GetSegmentIdResp getSegmentId(1: GetSegmentIdReq req);
}
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ nebula_add_library(
processors/admin/ListClusterInfoProcessor.cpp
processors/admin/GetMetaDirInfoProcessor.cpp
processors/admin/VerifyClientVersionProcessor.cpp
processors/admin/SaveGraphVersionProcessor.cpp
processors/config/RegConfigProcessor.cpp
processors/config/GetConfigProcessor.cpp
processors/config/ListConfigsProcessor.cpp
Expand Down
7 changes: 7 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "meta/processors/admin/ListClusterInfoProcessor.h"
#include "meta/processors/admin/ListSnapshotsProcessor.h"
#include "meta/processors/admin/RestoreProcessor.h"
#include "meta/processors/admin/SaveGraphVersionProcessor.h"
#include "meta/processors/admin/VerifyClientVersionProcessor.h"
#include "meta/processors/config/GetConfigProcessor.h"
#include "meta/processors/config/ListConfigsProcessor.h"
Expand Down Expand Up @@ -539,6 +540,12 @@ folly::Future<cpp2::VerifyClientVersionResp> MetaServiceHandler::future_verifyCl
RETURN_FUTURE(processor);
}

folly::Future<cpp2::SaveGraphVersionResp> MetaServiceHandler::future_saveGraphVersion(
const cpp2::SaveGraphVersionReq& req) {
auto* processor = SaveGraphVersionProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::GetWorkerIdResp> MetaServiceHandler::future_getWorkerId(
const cpp2::GetWorkerIdReq& req) {
auto* processor = GetWorkerIdProcessor::instance(kvstore_);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::VerifyClientVersionResp> future_verifyClientVersion(
const cpp2::VerifyClientVersionReq& req) override;

folly::Future<cpp2::SaveGraphVersionResp> future_saveGraphVersion(
const cpp2::SaveGraphVersionReq& req) override;

folly::Future<cpp2::GetWorkerIdResp> future_getWorkerId(const cpp2::GetWorkerIdReq& req) override;

folly::Future<cpp2::GetSegmentIdResp> future_getSegmentId(
Expand Down
36 changes: 36 additions & 0 deletions src/meta/processors/admin/SaveGraphVersionProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "meta/processors/admin/SaveGraphVersionProcessor.h"

#include "common/graph/Response.h"
#include "version/Version.h"

namespace nebula {
namespace meta {
void SaveGraphVersionProcessor::process(const cpp2::SaveGraphVersionReq& req) {
const auto& host = req.get_host();

// Build a map of graph service host and its version
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));

// Save the version of the graph service
auto errCode = doSyncPut(versionData);
if (errCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to save graph version, errorCode: "
<< apache::thrift::util::enumNameSafe(errCode);
handleErrorCode(errCode);
onFinished();
return;
}
handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);

onFinished();
}
} // namespace meta
} // namespace nebula
27 changes: 27 additions & 0 deletions src/meta/processors/admin/SaveGraphVersionProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef META_SAVEGRAPHVERSIONPROCESSOR_H_
#define META_SAVEGRAPHVERSIONPROCESSOR_H_

#include "meta/processors/BaseProcessor.h"

namespace nebula {
namespace meta {
class SaveGraphVersionProcessor final : public BaseProcessor<cpp2::SaveGraphVersionResp> {
public:
static SaveGraphVersionProcessor* instance(kvstore::KVStore* kvstore) {
return new SaveGraphVersionProcessor(kvstore);
}

void process(const cpp2::SaveGraphVersionReq& req);

private:
explicit SaveGraphVersionProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::SaveGraphVersionResp>(kvstore) {}
};
} // namespace meta
} // namespace nebula
#endif // META_SAVEGRAPHVERSIONPROCESSOR_H_
8 changes: 1 addition & 7 deletions src/meta/processors/admin/VerifyClientVersionProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,8 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r
"Meta client version(%s) is not accepted, current meta client white list: %s.",
req.get_client_version().c_str(),
FLAGS_client_white_list.c_str());
} else {
const auto& host = req.get_host();
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));
handleErrorCode(doSyncPut(versionData));
}

onFinished();
}
} // namespace meta
Expand Down

0 comments on commit d6df28c

Please sign in to comment.