Skip to content

Commit

Permalink
Synchronize the enterprise version to reduce conflicts (#3588)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
panda-sheep and Sophie-Xie authored Dec 29, 2021
1 parent 8c2e50f commit f749d77
Show file tree
Hide file tree
Showing 49 changed files with 607 additions and 481 deletions.
54 changes: 33 additions & 21 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ bool MetaClient::loadData() {
return false;
}

if (!loadFulltextClients()) {
LOG(ERROR) << "Load fulltext services Failed";
if (!loadGlobalServiceClients()) {
LOG(ERROR) << "Load global services Failed";
return false;
}

Expand Down Expand Up @@ -518,15 +518,15 @@ bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCa
return true;
}

bool MetaClient::loadFulltextClients() {
auto ftRet = listFTClients().get();
if (!ftRet.ok()) {
LOG(ERROR) << "List fulltext services failed, status:" << ftRet.status();
bool MetaClient::loadGlobalServiceClients() {
auto ret = listServiceClients(cpp2::ExternalServiceType::ELASTICSEARCH).get();
if (!ret.ok()) {
LOG(ERROR) << "List services failed, status:" << ret.status();
return false;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
fulltextClientList_ = std::move(ftRet).value();
serviceClientList_ = std::move(ret).value();
}
return true;
}
Expand Down Expand Up @@ -3276,16 +3276,16 @@ folly::Future<StatusOr<nebula::cpp2::ErrorCode>> MetaClient::reportTaskFinish(
return fut;
}

folly::Future<StatusOr<bool>> MetaClient::signInFTService(
cpp2::FTServiceType type, const std::vector<cpp2::FTClient>& clients) {
cpp2::SignInFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signInService(
const cpp2::ExternalServiceType& type, const std::vector<cpp2::ServiceClient>& clients) {
cpp2::SignInServiceReq req;
req.type_ref() = type;
req.clients_ref() = clients;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signInFTService(request); },
[](auto client, auto request) { return client->future_signInService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3294,13 +3294,14 @@ folly::Future<StatusOr<bool>> MetaClient::signInFTService(
return future;
}

folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
cpp2::SignOutFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signOutService(const cpp2::ExternalServiceType& type) {
cpp2::SignOutServiceReq req;
req.type_ref() = type;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signOutFTService(request); },
[](auto client, auto request) { return client->future_signOutService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3309,25 +3310,36 @@ folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
return future;
}

folly::Future<StatusOr<std::vector<cpp2::FTClient>>> MetaClient::listFTClients() {
cpp2::ListFTClientsReq req;
folly::Promise<StatusOr<std::vector<cpp2::FTClient>>> promise;
folly::Future<StatusOr<ServiceClientsList>> MetaClient::listServiceClients(
const cpp2::ExternalServiceType& type) {
cpp2::ListServiceClientsReq req;
req.type_ref() = type;
folly::Promise<StatusOr<ServiceClientsList>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listFTClients(request); },
[](cpp2::ListFTClientsResp&& resp) -> decltype(auto) {
[](auto client, auto request) { return client->future_listServiceClients(request); },
[](cpp2::ListServiceClientsResp&& resp) -> decltype(auto) {
return std::move(resp).get_clients();
},
std::move(promise));
return future;
}

StatusOr<std::vector<cpp2::FTClient>> MetaClient::getFTClientsFromCache() {
StatusOr<std::vector<cpp2::ServiceClient>> MetaClient::getServiceClientsFromCache(
const cpp2::ExternalServiceType& type) {
if (!ready_) {
return Status::Error("Not ready!");
}
return fulltextClientList_;

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
if (type == cpp2::ExternalServiceType::ELASTICSEARCH) {
auto sIter = serviceClientList_.find(type);
if (sIter != serviceClientList_.end()) {
return sIter->second;
}
}
return Status::Error("Service not found!");
}

folly::Future<StatusOr<bool>> MetaClient::createFTIndex(const std::string& name,
Expand Down
27 changes: 16 additions & 11 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

// Get services
using ServiceClientsList =
std::unordered_map<cpp2::ExternalServiceType, std::vector<cpp2::ServiceClient>>;

struct SpaceInfoCache {
cpp2::SpaceDesc spaceDesc_;
PartsAlloc partsAlloc_;
Expand Down Expand Up @@ -144,9 +148,6 @@ using UserPasswordMap = std::unordered_map<std::string, std::string>;
using MetaConfigMap =
std::unordered_map<std::pair<cpp2::ConfigModule, std::string>, cpp2::ConfigItem>;

// get fulltext services
using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
Expand Down Expand Up @@ -447,15 +448,17 @@ class MetaClient {
StatusOr<std::vector<RemoteListenerInfo>> getListenerHostTypeBySpacePartType(GraphSpaceID spaceId,
PartitionID partId);

// Operations for fulltext services
folly::Future<StatusOr<bool>> signInFTService(cpp2::FTServiceType type,
const std::vector<cpp2::FTClient>& clients);
// Operations for services
folly::Future<StatusOr<bool>> signInService(const cpp2::ExternalServiceType& type,
const std::vector<cpp2::ServiceClient>& clients);

folly::Future<StatusOr<bool>> signOutFTService();
folly::Future<StatusOr<bool>> signOutService(const cpp2::ExternalServiceType& type);

folly::Future<StatusOr<std::vector<cpp2::FTClient>>> listFTClients();
folly::Future<StatusOr<ServiceClientsList>> listServiceClients(
const cpp2::ExternalServiceType& type);

StatusOr<std::vector<cpp2::FTClient>> getFTClientsFromCache();
StatusOr<std::vector<cpp2::ServiceClient>> getServiceClientsFromCache(
const cpp2::ExternalServiceType& type);

// Operations for fulltext index.

Expand Down Expand Up @@ -682,7 +685,7 @@ class MetaClient {

bool loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache);

bool loadFulltextClients();
bool loadGlobalServiceClients();

bool loadFulltextIndexes();

Expand Down Expand Up @@ -815,7 +818,9 @@ class MetaClient {

NameIndexMap tagNameIndexMap_;
NameIndexMap edgeNameIndexMap_;
FulltextClientsList fulltextClientList_;

// Global service client
ServiceClientsList serviceClientList_;
FTIndexMap fulltextIndexMap_;

mutable folly::RWSpinLock localCacheLock_;
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class SchemaManager {
// get all latest version of all edge schema
virtual StatusOr<EdgeSchema> getAllLatestVerEdgeSchema(GraphSpaceID space) = 0;

virtual StatusOr<std::vector<nebula::meta::cpp2::FTClient>> getFTClients() = 0;
virtual StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) = 0;

// Get the TagID or EdgeType by the name.
// The first one is a bool which is used to distinguish the type.
Expand Down
7 changes: 4 additions & 3 deletions src/common/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ StatusOr<EdgeSchema> ServerBasedSchemaManager::getAllLatestVerEdgeSchema(GraphSp
return metaClient_->getAllLatestVerEdgeSchemaFromCache(space);
}

StatusOr<std::vector<nebula::meta::cpp2::FTClient>> ServerBasedSchemaManager::getFTClients() {
auto ret = metaClient_->getFTClientsFromCache();
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>>
ServerBasedSchemaManager::getServiceClients(meta::cpp2::ExternalServiceType type) {
auto ret = metaClient_->getServiceClientsFromCache(type);
if (!ret.ok()) {
return ret.status();
}
if (ret.value().empty()) {
return Status::Error("fulltext client list is empty");
return Status::Error("Service list is empty");
}
return std::move(ret).value();
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/ServerBasedSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class ServerBasedSchemaManager : public SchemaManager {
// get all latest version of all edges
StatusOr<EdgeSchema> getAllLatestVerEdgeSchema(GraphSpaceID space) override;

StatusOr<std::vector<nebula::meta::cpp2::FTClient>> getFTClients() override;
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) override;

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) override;
Expand Down
38 changes: 22 additions & 16 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
{"zones", {"__zones__", true}},
{"ft_service", {"__ft_service__", false}},
{"services", {"__services__", false}},
{"sessions", {"__sessions__", true}}};

// SystemInfo will always be backed up
Expand Down Expand Up @@ -89,7 +89,7 @@ static const std::string kBalancePlanTable = tableMaps.at("balance_plan").fir
static const std::string kLocalIdTable = tableMaps.at("local_id").first; // NOLINT

const std::string kFTIndexTable = tableMaps.at("ft_index").first; // NOLINT
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
const std::string kServicesTable = systemTableMaps.at("services").first; // NOLINT
const std::string kSessionsTable = systemTableMaps.at("sessions").first; // NOLINT

const std::string kIdKey = systemInfoMaps.at("autoIncrementId").first; // NOLINT
Expand Down Expand Up @@ -1155,27 +1155,33 @@ const std::string& MetaKeyUtils::statsKeyPrefix() {
return kStatsTable;
}

std::string MetaKeyUtils::fulltextServiceKey() {
std::string MetaKeyUtils::serviceKey(const meta::cpp2::ExternalServiceType& type) {
std::string key;
key.reserve(kFTServiceTable.size());
key.append(kFTServiceTable.data(), kFTServiceTable.size());
key.reserve(kServicesTable.size() + sizeof(meta::cpp2::ExternalServiceType));
key.append(kServicesTable.data(), kServicesTable.size())
.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::ExternalServiceType));
return key;
}

std::string MetaKeyUtils::fulltextServiceVal(meta::cpp2::FTServiceType type,
const std::vector<meta::cpp2::FTClient>& clients) {
std::string val, cval;
apache::thrift::CompactSerializer::serialize(clients, &cval);
val.reserve(sizeof(meta::cpp2::FTServiceType) + cval.size());
val.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::FTServiceType)).append(cval);
const std::string& MetaKeyUtils::servicePrefix() {
return kServicesTable;
}

meta::cpp2::ExternalServiceType MetaKeyUtils::parseServiceType(folly::StringPiece rawData) {
auto offset = kServicesTable.size();
return *reinterpret_cast<const meta::cpp2::ExternalServiceType*>(rawData.data() + offset);
}

std::string MetaKeyUtils::serviceVal(const std::vector<meta::cpp2::ServiceClient>& clients) {
std::string val;
apache::thrift::CompactSerializer::serialize(clients, &val);
return val;
}

std::vector<meta::cpp2::FTClient> MetaKeyUtils::parseFTClients(folly::StringPiece rawData) {
std::vector<meta::cpp2::FTClient> clients;
int32_t offset = sizeof(meta::cpp2::FTServiceType);
auto clientsRaw = rawData.subpiece(offset, rawData.size() - offset);
apache::thrift::CompactSerializer::deserialize(clientsRaw, clients);
std::vector<meta::cpp2::ServiceClient> MetaKeyUtils::parseServiceClients(
folly::StringPiece rawData) {
std::vector<meta::cpp2::ServiceClient> clients;
apache::thrift::CompactSerializer::deserialize(rawData, clients);
return clients;
}

Expand Down
11 changes: 7 additions & 4 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,15 @@ class MetaKeyUtils final {

static GraphSpaceID parseStatsSpace(folly::StringPiece rawData);

static std::string fulltextServiceKey();
static std::string serviceKey(const meta::cpp2::ExternalServiceType& type);

static std::string fulltextServiceVal(meta::cpp2::FTServiceType type,
const std::vector<meta::cpp2::FTClient>& clients);
static std::string serviceVal(const std::vector<meta::cpp2::ServiceClient>& client);

static std::vector<meta::cpp2::FTClient> parseFTClients(folly::StringPiece rawData);
static const std::string& servicePrefix();

static meta::cpp2::ExternalServiceType parseServiceType(folly::StringPiece rawData);

static std::vector<meta::cpp2::ServiceClient> parseServiceClients(folly::StringPiece rawData);

static const std::string& sessionPrefix();

Expand Down
6 changes: 3 additions & 3 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ nebula_add_library(
admin/IngestExecutor.cpp
admin/ConfigExecutor.cpp
admin/ZoneExecutor.cpp
admin/ShowTSClientsExecutor.cpp
admin/SignInTSServiceExecutor.cpp
admin/SignOutTSServiceExecutor.cpp
admin/ShowServiceClientsExecutor.cpp
admin/SignInServiceExecutor.cpp
admin/SignOutServiceExecutor.cpp
admin/SessionExecutor.cpp
admin/ShowQueriesExecutor.cpp
admin/KillQueryExecutor.cpp
Expand Down
18 changes: 9 additions & 9 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
#include "graph/executor/admin/ShowHostsExecutor.h"
#include "graph/executor/admin/ShowMetaLeaderExecutor.h"
#include "graph/executor/admin/ShowQueriesExecutor.h"
#include "graph/executor/admin/ShowServiceClientsExecutor.h"
#include "graph/executor/admin/ShowStatsExecutor.h"
#include "graph/executor/admin/ShowTSClientsExecutor.h"
#include "graph/executor/admin/SignInTSServiceExecutor.h"
#include "graph/executor/admin/SignOutTSServiceExecutor.h"
#include "graph/executor/admin/SignInServiceExecutor.h"
#include "graph/executor/admin/SignOutServiceExecutor.h"
#include "graph/executor/admin/SnapshotExecutor.h"
#include "graph/executor/admin/SpaceExecutor.h"
#include "graph/executor/admin/SubmitJobExecutor.h"
Expand Down Expand Up @@ -489,17 +489,17 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kShowStats: {
return pool->add(new ShowStatsExecutor(node, qctx));
}
case PlanNode::Kind::kShowTSClients: {
return pool->add(new ShowTSClientsExecutor(node, qctx));
case PlanNode::Kind::kShowServiceClients: {
return pool->add(new ShowServiceClientsExecutor(node, qctx));
}
case PlanNode::Kind::kShowFTIndexes: {
return pool->add(new ShowFTIndexesExecutor(node, qctx));
}
case PlanNode::Kind::kSignInTSService: {
return pool->add(new SignInTSServiceExecutor(node, qctx));
case PlanNode::Kind::kSignInService: {
return pool->add(new SignInServiceExecutor(node, qctx));
}
case PlanNode::Kind::kSignOutTSService: {
return pool->add(new SignOutTSServiceExecutor(node, qctx));
case PlanNode::Kind::kSignOutService: {
return pool->add(new SignOutServiceExecutor(node, qctx));
}
case PlanNode::Kind::kDownload: {
return pool->add(new DownloadExecutor(node, qctx));
Expand Down
Loading

0 comments on commit f749d77

Please sign in to comment.