Skip to content

Commit

Permalink
DSS part2: support sign in drainer service and add listener sync and …
Browse files Browse the repository at this point in the history
…add drainer (vesoft-inc#3)

* support sign in drainer service and add listener sync add add drainer

ut pass

Add more UT

add listener sync host:port to space name

* rebase master

* improve code

* address critical27's comments
  • Loading branch information
panda-sheep authored Oct 27, 2021
1 parent 39c83bd commit 5094091
Show file tree
Hide file tree
Showing 97 changed files with 2,713 additions and 543 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ macro(nebula_add_library name type)
graph_thrift_generator
storage_thrift_generator
meta_thrift_generator
drainer_thrift_generator
raftex_thrift_generator
# hbase_thrift_generator
parser_target
Expand Down
219 changes: 191 additions & 28 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ bool MetaClient::loadData() {
return false;
}

if (!loadFulltextClients()) {
LOG(ERROR) << "Load fulltext services Failed";
if (!loadGlobalServiceClients()) {
LOG(ERROR) << "Load global services Failed";
return false;
}

Expand Down Expand Up @@ -263,6 +263,16 @@ bool MetaClient::loadData() {
return false;
}

if (!loadListenerDrainers(spaceId, spaceCache)) {
LOG(ERROR) << "Load Listener Drainers Failed";
return false;
}

if (!loadDrainers(spaceId, spaceCache)) {
LOG(ERROR) << "Load Drainers Failed";
return false;
}

// get space properties
auto resp = getSpace(spaceName).get();
if (!resp.ok()) {
Expand Down Expand Up @@ -483,7 +493,7 @@ bool MetaClient::loadIndexes(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCach
}

bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache) {
auto listenerRet = listListener(spaceId).get();
auto listenerRet = listListeners(spaceId, cpp2::ListenerType::ALL).get();
if (!listenerRet.ok()) {
LOG(ERROR) << "Get listeners failed for spaceId " << spaceId << ", " << listenerRet.status();
return false;
Expand All @@ -497,15 +507,38 @@ bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCa
return true;
}

bool MetaClient::loadFulltextClients() {
auto ftRet = listFTClients().get();
if (!ftRet.ok()) {
LOG(ERROR) << "List fulltext services failed, status:" << ftRet.status();
bool MetaClient::loadListenerDrainers(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache) {
auto listenerDrainerRet = listListenerDrainers(spaceId).get();
if (!listenerDrainerRet.ok()) {
LOG(ERROR) << "Get sync listener drainers failed for spaceId " << spaceId << ", "
<< listenerDrainerRet.status();
return false;
}

cache->drainerclients_ = std::move(listenerDrainerRet.value());
return true;
}

bool MetaClient::loadDrainers(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache) {
auto drainerRet = listDrainers(spaceId).get();
if (!drainerRet.ok()) {
LOG(ERROR) << "Get drainers failed for spaceId " << spaceId << ", " << drainerRet.status();
return false;
}

cache->drainerServer_ = std::move(drainerRet.value());
return true;
}

bool MetaClient::loadGlobalServiceClients() {
auto ret = listServiceClients(cpp2::ExternalServiceType::ALL).get();
if (!ret.ok()) {
LOG(ERROR) << "List services failed, status:" << ret.status();
return false;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
fulltextClientList_ = std::move(ftRet).value();
serviceClientList_ = std::move(ret).value();
}
return true;
}
Expand Down Expand Up @@ -824,6 +857,14 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Backup table failure!");
case nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND:
return Status::Error("Session not existed!");
case nebula::cpp2::ErrorCode::E_SERVICE_NOT_FOUND:
return Status::Error("Service not existed!");
case nebula::cpp2::ErrorCode::E_DRAINER_NOT_FOUND:
return Status::Error("Drainer not existed!");
case nebula::cpp2::ErrorCode::E_NO_VALID_DRAINER:
return Status::Error("Invalid drainer!");
case nebula::cpp2::ErrorCode::E_LISTENER_CONFLICT:
return Status::Error("Listener host conflict!");
default:
return Status::Error("Unknown error!");
}
Expand Down Expand Up @@ -2737,11 +2778,15 @@ folly::Future<StatusOr<std::vector<cpp2::Snapshot>>> MetaClient::listSnapshots()

folly::Future<StatusOr<bool>> MetaClient::addListener(GraphSpaceID spaceId,
cpp2::ListenerType type,
std::vector<HostAddr> hosts) {
std::vector<HostAddr> hosts,
const std::string* spaceName) {
cpp2::AddListenerReq req;
req.set_space_id(spaceId);
req.set_type(type);
req.set_hosts(std::move(hosts));
if (spaceName) {
req.set_space_name(*spaceName);
}
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
Expand Down Expand Up @@ -2771,22 +2816,39 @@ folly::Future<StatusOr<bool>> MetaClient::removeListener(GraphSpaceID spaceId,
return future;
}

folly::Future<StatusOr<std::vector<cpp2::ListenerInfo>>> MetaClient::listListener(
GraphSpaceID spaceId) {
cpp2::ListListenerReq req;
folly::Future<StatusOr<std::vector<cpp2::ListenerInfo>>> MetaClient::listListeners(
GraphSpaceID spaceId, const cpp2::ListenerType& type) {
cpp2::ListListenersReq req;
req.set_space_id(spaceId);
req.set_type(type);
folly::Promise<StatusOr<std::vector<cpp2::ListenerInfo>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listListener(request); },
[](cpp2::ListListenerResp&& resp) -> decltype(auto) {
[](auto client, auto request) { return client->future_listListeners(request); },
[](cpp2::ListListenersResp&& resp) -> decltype(auto) {
return std::move(resp).get_listeners();
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::unordered_map<PartitionID, HostAddr>>> MetaClient::listListenerDrainers(
GraphSpaceID spaceId) {
cpp2::ListListenerDrainersReq req;
req.set_space_id(spaceId);
folly::Promise<StatusOr<std::unordered_map<PartitionID, HostAddr>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listListenerDrainers(request); },
[](cpp2::ListListenerDrainersResp&& resp) -> decltype(auto) {
return std::move(resp).get_drainerClients();
},
std::move(promise));
return future;
}

bool MetaClient::registerCfg() {
auto ret = regConfig(gflagsDeclared_).get();
if (ret.ok()) {
Expand Down Expand Up @@ -2898,6 +2960,54 @@ StatusOr<std::vector<RemoteListenerInfo>> MetaClient::getListenerHostTypeBySpace
return items;
}

folly::Future<StatusOr<bool>> MetaClient::addDrainer(GraphSpaceID spaceId,
std::vector<HostAddr> hosts) {
cpp2::AddDrainerReq req;
req.set_space_id(spaceId);
req.set_hosts(std::move(hosts));
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_addDrainer(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::removeDrainer(GraphSpaceID spaceId) {
cpp2::RemoveDrainerReq req;
req.set_space_id(spaceId);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_removeDrainer(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<cpp2::DrainerInfo>>> MetaClient::listDrainers(
GraphSpaceID spaceId) {
cpp2::ListDrainersReq req;
req.set_space_id(spaceId);
folly::Promise<StatusOr<std::vector<cpp2::DrainerInfo>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listDrainers(request); },
[](cpp2::ListDrainersResp&& resp) -> decltype(auto) {
return std::move(resp).get_drainers();
},
std::move(promise));
return future;
}

bool MetaClient::loadCfg() {
if (!configReady_ && !registerCfg()) {
return false;
Expand Down Expand Up @@ -3243,16 +3353,16 @@ folly::Future<StatusOr<nebula::cpp2::ErrorCode>> MetaClient::reportTaskFinish(
return fut;
}

folly::Future<StatusOr<bool>> MetaClient::signInFTService(
cpp2::FTServiceType type, const std::vector<cpp2::FTClient>& clients) {
cpp2::SignInFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signInService(
const cpp2::ExternalServiceType& type, const std::vector<cpp2::ServiceClient>& clients) {
cpp2::SignInServiceReq req;
req.set_type(type);
req.set_clients(clients);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signInFTService(request); },
[](auto client, auto request) { return client->future_signInService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3261,13 +3371,14 @@ folly::Future<StatusOr<bool>> MetaClient::signInFTService(
return future;
}

folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
cpp2::SignOutFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signOutService(const cpp2::ExternalServiceType& type) {
cpp2::SignOutServiceReq req;
req.set_type(type);
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signOutFTService(request); },
[](auto client, auto request) { return client->future_signOutService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3276,25 +3387,77 @@ folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
return future;
}

folly::Future<StatusOr<std::vector<cpp2::FTClient>>> MetaClient::listFTClients() {
cpp2::ListFTClientsReq req;
folly::Promise<StatusOr<std::vector<cpp2::FTClient>>> promise;
folly::Future<StatusOr<ServiceClientsList>> MetaClient::listServiceClients(
const cpp2::ExternalServiceType& type) {
cpp2::ListServiceClientsReq req;
req.set_type(type);
folly::Promise<StatusOr<ServiceClientsList>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listFTClients(request); },
[](cpp2::ListFTClientsResp&& resp) -> decltype(auto) {
[](auto client, auto request) { return client->future_listServiceClients(request); },
[](cpp2::ListServiceClientsResp&& resp) -> decltype(auto) {
return std::move(resp).get_clients();
},
std::move(promise));
return future;
}

StatusOr<std::vector<cpp2::FTClient>> MetaClient::getFTClientsFromCache() {
StatusOr<std::vector<cpp2::ServiceClient>> MetaClient::getServiceClientsFromCache(
const cpp2::ExternalServiceType& type) {
if (!ready_) {
return Status::Error("Not ready!");
}
return fulltextClientList_;

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
if (type == cpp2::ExternalServiceType::ELASTICSEARCH ||
type == cpp2::ExternalServiceType::DRAINER) {
auto sIter = serviceClientList_.find(type);
if (sIter != serviceClientList_.end()) {
return sIter->second;
}
}
return Status::Error("Service not found!");
}

StatusOr<HostAddr> MetaClient::getDrainerClientFromCache(GraphSpaceID spaceId, PartitionID partId) {
if (!ready_) {
return Status::Error("Not ready!");
}

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
auto spaceIt = localCache_.find(spaceId);
if (spaceIt == localCache_.end()) {
VLOG(3) << "Space " << spaceId << " not found!";
return Status::SpaceNotFound();
}

auto iter = spaceIt->second->drainerclients_.find(partId);
if (iter == spaceIt->second->drainerclients_.end()) {
VLOG(3) << "Space " << spaceId << " part " << partId << " listener drainer client not found!";
return Status::DrainerClientNotFound();
}
return iter->second;
}

StatusOr<std::vector<cpp2::DrainerInfo>> MetaClient::getDrainerFromCache(GraphSpaceID spaceId) {
if (!ready_) {
return Status::Error("Not ready!");
}

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
auto spaceIt = localCache_.find(spaceId);
if (spaceIt == localCache_.end()) {
VLOG(3) << "Space " << spaceId << " not found!";
return Status::SpaceNotFound();
}

auto drainers = spaceIt->second->drainerServer_;
if (drainers.empty()) {
VLOG(3) << "Space " << spaceId << " drainer not found!";
return Status::DrainerNotFound();
}
return drainers;
}

folly::Future<StatusOr<bool>> MetaClient::createFTIndex(const std::string& name,
Expand Down
Loading

0 comments on commit 5094091

Please sign in to comment.