Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify the client version. #2965

Merged
merged 14 commits into from
Sep 30, 2021
214 changes: 122 additions & 92 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ bool MetaClient::isMetadReady() {
}

bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) {
auto status = verifyVersion();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

if (!options_.skipConfig_) {
std::string gflagsJsonPath;
GflagsManager::getGflagsModule(gflagsModule_);
Expand Down Expand Up @@ -568,98 +574,102 @@ void MetaClient::getResponse(Request req,
folly::RWSpinLock::ReadHolder holder(&hostLock_);
host = toLeader ? leader_ : active_;
}
folly::via(evb,
[host,
evb,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
auto client = clientsMan_->client(host, evb, false, FLAGS_meta_client_timeout_ms);
VLOG(1) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
.then([host,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
evb,
this](folly::Try<RpcResponse>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
if (toLeader) {
updateLeader();
} else {
updateActive();
}
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
toLeader,
retry + 1,
retryLimit);
},
FLAGS_meta_client_retry_interval_secs * 1000);
return;
} else {
LOG(ERROR) << "Send request to " << host << ", exceed retry limit";
pro.setValue(Status::Error("RPC failure in MetaClient: %s",
t.exception().what().c_str()));
}
return;
}

auto&& resp = t.value();
if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
// succeeded
pro.setValue(respGen(std::move(resp)));
return;
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
updateLeader(resp.get_leader());
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
toLeader,
retry + 1,
retryLimit);
},
FLAGS_meta_client_retry_interval_secs * 1000);
return;
}
}
pro.setValue(this->handleResponse(resp));
}); // then
}); // via
folly::via(
evb,
[host,
evb,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
auto client = clientsMan_->client(host, evb, false, FLAGS_meta_client_timeout_ms);
VLOG(1) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
.then([host,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
evb,
this](folly::Try<RpcResponse>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
if (toLeader) {
updateLeader();
} else {
updateActive();
}
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
toLeader,
retry + 1,
retryLimit);
},
FLAGS_meta_client_retry_interval_secs * 1000);
return;
} else {
LOG(ERROR) << "Send request to " << host << ", exceed retry limit";
pro.setValue(
Status::Error("RPC failure in MetaClient: %s", t.exception().what().c_str()));
}
return;
}

auto&& resp = t.value();
if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
// succeeded
pro.setValue(respGen(std::move(resp)));
return;
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
updateLeader(resp.get_leader());
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
toLeader,
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
toLeader,
retry + 1,
retryLimit);
},
FLAGS_meta_client_retry_interval_secs * 1000);
return;
}
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE) {
pro.setValue(respGen(std::move(resp)));
return;
}
pro.setValue(this->handleResponse(resp));
}); // then
}); // via
}

std::vector<SpaceIdName> MetaClient::toSpaceIdName(const std::vector<cpp2::IdName>& tIdNames) {
Expand Down Expand Up @@ -3564,5 +3574,25 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)
return killedPlans_.load()->count({sessionId, planId});
}

Status MetaClient::verifyVersion() {
auto req = cpp2::VerifyClientVersionReq();
folly::Promise<StatusOr<cpp2::VerifyClientVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_verifyClientVersion(request); },
[](cpp2::VerifyClientVersionResp&& resp) { return std::move(resp); },
std::move(promise));

auto respStatus = std::move(future).get();
if (!respStatus.ok()) {
return respStatus.status();
}
auto resp = std::move(respStatus).value();
if (resp.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Client verified failed: %s", resp.get_error_msg()->c_str());
}
return Status::OK();
}
} // namespace meta
} // namespace nebula
3 changes: 3 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryOnceTest);
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
FRIEND_TEST(MetaClientTest, VerifyClientTest);
friend class KillQueryMetaWrapper;
FRIEND_TEST(ChainAddEdgesTest, AddEdgesLocalTest);
friend class storage::MetaClientTestUpdater;
Expand Down Expand Up @@ -710,6 +711,8 @@ class MetaClient {

ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache);

Status verifyVersion();

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;
Expand Down
3 changes: 3 additions & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@
X(E_USER_CANCEL, -3052) \
X(E_TASK_EXECUTION_FAILED, -3053) \
\
X(E_PLAN_IS_KILLED, -3060) \
X(E_CLIENT_SERVER_INCOMPATIBLE, -3061) \
\
X(E_UNKNOWN, -8000)

namespace nebula {
Expand Down
1 change: 1 addition & 0 deletions src/graph/context/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ SET(CONTEXT_TEST_LIBS
$<TARGET_OBJECTS:planner_obj>
$<TARGET_OBJECTS:idgenerator_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:version_obj>
)

nebula_add_test(
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ SET(EXEC_QUERY_TEST_OBJS
$<TARGET_OBJECTS:expr_visitor_obj>
$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:version_obj>
)

SET(EXEC_QUERY_TEST_LIBS
Expand Down
1 change: 1 addition & 0 deletions src/graph/optimizer/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(OPTIMIZER_TEST_LIB
$<TARGET_OBJECTS:validator_obj>
$<TARGET_OBJECTS:optimizer_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:version_obj>
)

nebula_add_test(
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nebula_add_test(
$<TARGET_OBJECTS:util_obj>
$<TARGET_OBJECTS:idgenerator_obj>
$<TARGET_OBJECTS:graph_context_obj>
$<TARGET_OBJECTS:version_obj>
LIBRARIES
gtest
${PROXYGEN_LIBRARIES}
Expand Down
7 changes: 7 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "graph/service/GraphFlags.h"

#include "version/Version.h"

DEFINE_int32(port, 3699, "Nebula Graph daemon's listen port");
DEFINE_int32(client_idle_timeout_secs,
0,
Expand Down Expand Up @@ -65,3 +67,8 @@ DEFINE_bool(disable_octal_escape_char,
" in next version to ensure compatibility with cypher.");

DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");

DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list.");
DEFINE_string(client_white_list,
nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0",
"A white list for different client versions, seperate with colon.");
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ DECLARE_string(local_ip);

DECLARE_bool(enable_experimental_feature);

DECLARE_bool(enable_client_white_list);
DECLARE_string(client_white_list);
#endif // GRAPH_GRAPHFLAGS_H_
17 changes: 17 additions & 0 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,22 @@ bool GraphService::auth(const std::string& username, const std::string& password
return false;
}

folly::Future<cpp2::VerifyClientVersionResp> GraphService::future_verifyClientVersion(
const cpp2::VerifyClientVersionReq& req) {
std::unordered_set<std::string> whiteList;
folly::splitTo<std::string>(
":", FLAGS_client_white_list, std::inserter(whiteList, whiteList.begin()));
cpp2::VerifyClientVersionResp resp;
if (FLAGS_enable_client_white_list && whiteList.find(req.get_version()) == whiteList.end()) {
resp.set_error_code(nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE);
resp.set_error_msg(folly::stringPrintf(
"Graph client version(%s) is not accepted, current graph client white list: %s.",
req.get_version().c_str(),
FLAGS_client_white_list.c_str()));
} else {
resp.set_error_code(nebula::cpp2::ErrorCode::SUCCEEDED);
}
return folly::makeFuture<cpp2::VerifyClientVersionResp>(std::move(resp));
}
} // namespace graph
} // namespace nebula
3 changes: 3 additions & 0 deletions src/graph/service/GraphService.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class GraphService final : public cpp2::GraphServiceSvIf {
folly::Future<std::string> future_executeJson(int64_t sessionId,
const std::string& stmt) override;

folly::Future<cpp2::VerifyClientVersionResp> future_verifyClientVersion(
const cpp2::VerifyClientVersionReq& req) override;

private:
bool auth(const std::string& username, const std::string& password);

Expand Down
1 change: 1 addition & 0 deletions src/graph/util/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nebula_add_test(
$<TARGET_OBJECTS:parser_obj>
$<TARGET_OBJECTS:graph_context_obj>
$<TARGET_OBJECTS:validator_obj>
$<TARGET_OBJECTS:version_obj>
LIBRARIES
gtest
gtest_main
Expand Down
4 changes: 4 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ cpp_include "common/datatypes/GeographyOps-inl.h"
*
*/

const binary (cpp.type = "char const *") version = "2.6.0"

typedef i32 (cpp.type = "nebula::GraphSpaceID") GraphSpaceID
typedef i32 (cpp.type = "nebula::PartitionID") PartitionID
typedef i32 (cpp.type = "nebula::TagID") TagID
Expand Down Expand Up @@ -409,5 +411,7 @@ enum ErrorCode {
E_OUTDATED_EDGE = -3072,
E_WRITE_WRITE_CONFLICT = -3073,

E_CLIENT_SERVER_INCOMPATIBLE = -3061,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
Loading