From e67c35baddc4e562c6dc395a8c1c6b3414fdd382 Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Mon, 13 May 2019 15:58:15 +0800 Subject: [PATCH] Support leader change in meta client && do some refactors (#354) * Support leader change in meta client && do some refactors * Address @laura-ding's comments * Fix remove tag schema bug --- src/interface/meta.thrift | 37 +-- src/kvstore/PartManager.cpp | 28 +- src/meta/MetaServiceHandler.cpp | 6 +- src/meta/MetaServiceHandler.h | 6 +- src/meta/ServerBasedSchemaManager.cpp | 4 +- src/meta/client/MetaClient.cpp | 363 +++++++++++---------- src/meta/client/MetaClient.h | 82 +++-- src/meta/processors/MultiPutProcessor.h | 4 +- src/meta/processors/RemoveProcessor.h | 7 +- src/meta/processors/RemoveRangeProcessor.h | 4 +- src/meta/test/MetaClientTest.cpp | 8 +- 11 files changed, 283 insertions(+), 266 deletions(-) diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 76597c98847..b9b454bec5f 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -80,6 +80,7 @@ struct EdgeItem { struct ExecResp { 1: ErrorCode code, + // For custom kv operations, it is useless. 2: ID id, // Valid if ret equals E_LEADER_CHANGED. 3: common.HostAddr leader, @@ -154,7 +155,8 @@ struct GetTagReq { struct GetTagResp { 1: ErrorCode code, - 2: common.Schema schema, + 2: common.HostAddr leader, + 3: common.Schema schema, } struct GetEdgeReq { @@ -235,10 +237,6 @@ struct MultiPutReq { 2: list pairs, } -struct MultiPutResp { - 1: ErrorCode code, -} - struct GetReq { 1: string segment, 2: string key, @@ -246,7 +244,8 @@ struct GetReq { struct GetResp { 1: ErrorCode code, - 2: string value, + 2: common.HostAddr leader, + 3: string value, } struct MultiGetReq { @@ -256,7 +255,8 @@ struct MultiGetReq { struct MultiGetResp { 1: ErrorCode code, - 2: list values, + 2: common.HostAddr leader, + 3: list values, } struct RemoveReq { @@ -264,20 +264,12 @@ struct RemoveReq { 2: string key, } -struct RemoveResp { - 1: ErrorCode code, -} - struct RemoveRangeReq { 1: string segment, 2: string start, 3: string end, } -struct RemoveRangeResp { - 1: ErrorCode code, -} - struct ScanReq { 1: string segment, 2: string start, @@ -286,7 +278,8 @@ struct ScanReq { struct ScanResp { 1: ErrorCode code, - 2: list values, + 2: common.HostAddr leader, + 3: list values, } struct HBResp { @@ -321,12 +314,12 @@ service MetaService { GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req); - MultiPutResp multiPut(1: MultiPutReq req); - GetResp get(1: GetReq req); - MultiGetResp multiGet(1: MultiGetReq req); - RemoveResp remove(1: RemoveReq req); - RemoveRangeResp removeRange(1: RemoveRangeReq req); - ScanResp scan(1: ScanReq req); + ExecResp multiPut(1: MultiPutReq req); + GetResp get(1: GetReq req); + MultiGetResp multiGet(1: MultiGetReq req); + ExecResp remove(1: RemoveReq req); + ExecResp removeRange(1: RemoveRangeReq req); + ScanResp scan(1: ScanReq req); HBResp heartBeat(1: HBReq req); } diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 31b2ece3e0c..f4f4ce3dc41 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -76,23 +76,35 @@ bool MetaServerBasedPartManager::spaceExist(const HostAddr& host, } void MetaServerBasedPartManager::onSpaceAdded(GraphSpaceID spaceId) { - CHECK_NOTNULL(handler_); - handler_->addSpace(spaceId); + if (handler_ != nullptr) { + handler_->addSpace(spaceId); + } else { + VLOG(1) << "handler_ is nullptr!"; + } } void MetaServerBasedPartManager::onSpaceRemoved(GraphSpaceID spaceId) { - CHECK_NOTNULL(handler_); - handler_->removeSpace(spaceId); + if (handler_ != nullptr) { + handler_->removeSpace(spaceId); + } else { + VLOG(1) << "handler_ is nullptr!"; + } } void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) { - CHECK_NOTNULL(handler_); - handler_->addPart(partMeta.spaceId_, partMeta.partId_); + if (handler_ != nullptr) { + handler_->addPart(partMeta.spaceId_, partMeta.partId_); + } else { + VLOG(1) << "handler_ is nullptr!"; + } } void MetaServerBasedPartManager::onPartRemoved(GraphSpaceID spaceId, PartitionID partId) { - CHECK_NOTNULL(handler_); - handler_->removePart(spaceId, partId); + if (handler_ != nullptr) { + handler_->removePart(spaceId, partId); + } else { + VLOG(1) << "handler_ is nullptr!"; + } } void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) { diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 5fa716d5d9e..701322e4203 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -78,7 +78,7 @@ MetaServiceHandler::future_getPartsAlloc(const cpp2::GetPartsAllocReq& req) { RETURN_FUTURE(processor); } -folly::Future +folly::Future MetaServiceHandler::future_multiPut(const cpp2::MultiPutReq& req) { auto* processor = MultiPutProcessor::instance(kvstore_); RETURN_FUTURE(processor); @@ -102,13 +102,13 @@ MetaServiceHandler::future_scan(const cpp2::ScanReq& req) { RETURN_FUTURE(processor); } -folly::Future +folly::Future MetaServiceHandler::future_remove(const cpp2::RemoveReq& req) { auto* processor = RemoveProcessor::instance(kvstore_); RETURN_FUTURE(processor); } -folly::Future +folly::Future MetaServiceHandler::future_removeRange(const cpp2::RemoveRangeReq& req) { auto* processor = RemoveRangeProcessor::instance(kvstore_); RETURN_FUTURE(processor); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 15c0c5c2716..e982a5e275d 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -47,7 +47,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { /** * Custom kv related operations. * */ - folly::Future + folly::Future future_multiPut(const cpp2::MultiPutReq& req) override; folly::Future @@ -56,10 +56,10 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_multiGet(const cpp2::MultiGetReq& req) override; - folly::Future + folly::Future future_remove(const cpp2::RemoveReq& req) override; - folly::Future + folly::Future future_removeRange(const cpp2::RemoveRangeReq& req) override; folly::Future diff --git a/src/meta/ServerBasedSchemaManager.cpp b/src/meta/ServerBasedSchemaManager.cpp index 4e2d4edcd26..bc64d41e43e 100644 --- a/src/meta/ServerBasedSchemaManager.cpp +++ b/src/meta/ServerBasedSchemaManager.cpp @@ -35,7 +35,7 @@ ServerBasedSchemaManager::getTagSchema(GraphSpaceID space, TagID tag, SchemaVer if (ver < 0) { ver = getNewestTagSchemaVer(space, tag); } - auto ret = metaClient_->getTagSchemeFromCache(space, tag, ver); + auto ret = metaClient_->getTagSchemaFromCache(space, tag, ver); if (ret.ok()) { return ret.value(); } @@ -72,7 +72,7 @@ ServerBasedSchemaManager::getEdgeSchema(GraphSpaceID space, EdgeType edge, Schem ver = getNewestEdgeSchemaVer(space, edge); } - auto ret = metaClient_->getEdgeSchemeFromCache(space, edge, ver); + auto ret = metaClient_->getEdgeSchemaFromCache(space, edge, ver); if (ret.ok()) { return ret.value(); } diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 944de4aea90..aaf8a3b4c48 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -14,19 +14,6 @@ DEFINE_string(meta_server_addrs, "", "list of meta server addresses," "the format looks like ip1:port1, ip2:port2, ip3:port3"); DEFINE_int32(meta_client_io_threads, 3, "meta client io threads"); -/** - * check argument is empty - */ -#define CHECK_PARAMETER_AND_RETURN_STATUS(argument) \ - if (argument.empty()) { \ - return Status::Error("argument is invalid!"); \ - } - -#define CHECK_SEGMENT_AND_RETURN_STATUS(segment) \ - if (!nebula::meta::MetaCommon::checkSegment(segment)) { \ - return Status::Error("segment is invalid!"); \ - } - namespace nebula { namespace meta { @@ -44,8 +31,7 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool CHECK(!addrs_.empty()); clientsMan_ = std::make_shared>(); - updateActiveHost(); - loadDataThreadFunc(); + updateHost(); LOG(INFO) << "Create meta client to " << active_; } @@ -56,6 +42,7 @@ MetaClient::~MetaClient() { } void MetaClient::init() { + loadDataThreadFunc(); CHECK(loadDataThread_.start()); size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900); loadDataThread_.addTimerTask(delayMS, @@ -205,17 +192,24 @@ template> MetaClient::getResponse( Request req, RemoteFunc remoteFunc, - RespGenerator respGen) { + RespGenerator respGen, + bool toLeader) { folly::Promise> pro; auto f = pro.getFuture(); auto* evb = ioThreadPool_->getEventBase(); - auto client = clientsMan_->client(active_, evb); + HostAddr host; + { + folly::RWSpinLock::ReadHolder holder(&hostLock_); + host = toLeader ? leader_ : active_; + } + auto client = clientsMan_->client(host, evb); remoteFunc(client, std::move(req)) .then(evb, [p = std::move(pro), respGen, this] (folly::Try&& t) mutable { // exception occurred during RPC if (t.hasException()) { p.setValue(Status::Error(folly::stringPrintf("RPC failure in MetaClient: %s", t.exception().what().c_str()))); + updateHost(); return; } // errored @@ -230,6 +224,137 @@ folly::Future> MetaClient::getResponse( return f; } +std::vector MetaClient::to(const std::vector& tHosts) { + std::vector hosts; + hosts.resize(tHosts.size()); + std::transform(tHosts.begin(), tHosts.end(), hosts.begin(), [](const auto& h) { + return HostAddr(h.get_ip(), h.get_port()); + }); + return hosts; +} + +std::vector MetaClient::toSpaceIdName(const std::vector& tIdNames) { + std::vector idNames; + idNames.resize(tIdNames.size()); + std::transform(tIdNames.begin(), tIdNames.end(), idNames.begin(), [] (const auto& tin) { + return SpaceIdName(tin.id.get_space_id(), tin.name); + }); + return idNames; +} + +template +Status MetaClient::handleResponse(const RESP& resp) { + switch (resp.get_code()) { + case cpp2::ErrorCode::SUCCEEDED: + return Status::OK(); + case cpp2::ErrorCode::E_EXISTED: + return Status::Error("existed!"); + case cpp2::ErrorCode::E_NOT_FOUND: + return Status::Error("not existed!"); + case cpp2::ErrorCode::E_LEADER_CHANGED: { + HostAddr leader(resp.get_leader().get_ip(), resp.get_leader().get_port()); + { + folly::RWSpinLock::WriteHolder holder(hostLock_); + leader_ = leader; + } + return Status::Error("Leader changed!"); + } + default: + return Status::Error("Unknown code %d", static_cast(resp.get_code())); + } +} + +PartsMap MetaClient::doGetPartsMap(const HostAddr& host, + const std::unordered_map< + GraphSpaceID, + std::shared_ptr>& localCache) { + PartsMap partMap; + for (auto it = localCache.begin(); it != localCache.end(); it++) { + auto spaceId = it->first; + auto& cache = it->second; + auto partsIt = cache->partsOnHost_.find(host); + if (partsIt != cache->partsOnHost_.end()) { + for (auto& partId : partsIt->second) { + auto partAllocIter = cache->partsAlloc_.find(partId); + CHECK(partAllocIter != cache->partsAlloc_.end()); + auto& partM = partMap[spaceId][partId]; + partM.spaceId_ = spaceId; + partM.partId_ = partId; + partM.peers_ = partAllocIter->second; + } + } + } + return partMap; +} + +void MetaClient::diff(const std::unordered_map>& newCache) { + if (listener_ == nullptr) { + VLOG(3) << "Listener is null!"; + return; + } + auto localHost = listener_->getLocalHost(); + auto newPartsMap = doGetPartsMap(localHost, newCache); + auto oldPartsMap = getPartsMapFromCache(localHost); + VLOG(1) << "Let's check if any new parts added/updated for " << localHost; + for (auto it = newPartsMap.begin(); it != newPartsMap.end(); it++) { + auto spaceId = it->first; + const auto& newParts = it->second; + auto oldIt = oldPartsMap.find(spaceId); + if (oldIt == oldPartsMap.end()) { + VLOG(1) << "SpaceId " << spaceId << " was added!"; + listener_->onSpaceAdded(spaceId); + for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { + listener_->onPartAdded(partIt->second); + } + } else { + const auto& oldParts = oldIt->second; + for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { + auto oldPartIt = oldParts.find(partIt->first); + if (oldPartIt == oldParts.end()) { + VLOG(1) << "SpaceId " << spaceId << ", partId " + << partIt->first << " was added!"; + listener_->onPartAdded(partIt->second); + } else { + const auto& oldPartMeta = oldPartIt->second; + const auto& newPartMeta = partIt->second; + if (oldPartMeta != newPartMeta) { + VLOG(1) << "SpaceId " << spaceId + << ", partId " << partIt->first << " was updated!"; + listener_->onPartUpdated(newPartMeta); + } + } + } + } + } + VLOG(1) << "Let's check if any old parts removed...."; + for (auto it = oldPartsMap.begin(); it != oldPartsMap.end(); it++) { + auto spaceId = it->first; + const auto& oldParts = it->second; + auto newIt = newPartsMap.find(spaceId); + if (newIt == newPartsMap.end()) { + VLOG(1) << "SpaceId " << spaceId << " was removed!"; + for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { + listener_->onPartRemoved(spaceId, partIt->first); + } + listener_->onSpaceRemoved(spaceId); + } else { + const auto& newParts = newIt->second; + for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { + auto newPartIt = newParts.find(partIt->first); + if (newPartIt == newParts.end()) { + VLOG(1) << "SpaceId " << spaceId + << ", partId " << partIt->first << " was removed!"; + listener_->onPartRemoved(spaceId, partIt->first); + } + } + } + } +} + + +/// ================================== public methods ================================= + folly::Future> MetaClient::createSpace(std::string name, int32_t partsNum, int32_t replicaFactor) { cpp2::CreateSpaceReq req; @@ -240,7 +365,7 @@ MetaClient::createSpace(std::string name, int32_t partsNum, int32_t replicaFacto return client->future_createSpace(request); }, [] (cpp2::ExecResp&& resp) -> GraphSpaceID { return resp.get_id().get_space_id(); - }); + }, true); } folly::Future>> MetaClient::listSpaces() { @@ -260,7 +385,7 @@ folly::Future> MetaClient::dropSpace(std::string name) { return client->future_dropSpace(request); }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } @@ -279,7 +404,7 @@ folly::Future> MetaClient::addHosts(const std::vector& return client->future_addHosts(request); }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } folly::Future>> MetaClient::listHosts() { @@ -306,7 +431,7 @@ folly::Future> MetaClient::removeHosts(const std::vectorfuture_removeHosts(request); }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } folly::Future>>> @@ -316,12 +441,12 @@ MetaClient::getPartsAlloc(GraphSpaceID spaceId) { return getResponse(std::move(req), [] (auto client, auto request) { return client->future_getPartsAlloc(request); }, [this] (cpp2::GetPartsAllocResp&& resp) -> decltype(auto) { - std::unordered_map> parts; - for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) { - parts.emplace(it->first, to(it->second)); - } - return parts; - }); + std::unordered_map> parts; + for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) { + parts.emplace(it->first, to(it->second)); + } + return parts; + }); } StatusOr @@ -357,8 +482,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp folly::Future> MetaClient::multiPut(std::string segment, std::vector> pairs) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(pairs); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || pairs.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::MultiPutReq req; std::vector data; for (auto& element : pairs) { @@ -369,15 +496,17 @@ MetaClient::multiPut(std::string segment, req.set_pairs(std::move(data)); return getResponse(std::move(req), [] (auto client, auto request) { return client->future_multiPut(request); - }, [] (cpp2::MultiPutResp&& resp) -> bool { + }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } folly::Future> MetaClient::get(std::string segment, std::string key) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(key); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || key.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::GetReq req; req.set_segment(std::move(segment)); req.set_key(std::move(key)); @@ -390,8 +519,10 @@ MetaClient::get(std::string segment, std::string key) { folly::Future>> MetaClient::multiGet(std::string segment, std::vector keys) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(keys); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || keys.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::MultiGetReq req; req.set_segment(std::move(segment)); req.set_keys(std::move(keys)); @@ -404,9 +535,10 @@ MetaClient::multiGet(std::string segment, std::vector keys) { folly::Future>> MetaClient::scan(std::string segment, std::string start, std::string end) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(start); - CHECK_PARAMETER_AND_RETURN_STATUS(end); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || start.empty() || end.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::ScanReq req; req.set_segment(std::move(segment)); req.set_start(std::move(start)); @@ -420,89 +552,35 @@ MetaClient::scan(std::string segment, std::string start, std::string end) { folly::Future> MetaClient::remove(std::string segment, std::string key) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(key); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || key.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::RemoveReq req; req.set_segment(std::move(segment)); req.set_key(std::move(key)); return getResponse(std::move(req), [] (auto client, auto request) { return client->future_remove(request); - }, [] (cpp2::RemoveResp&& resp) -> bool { + }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } folly::Future> MetaClient::removeRange(std::string segment, std::string start, std::string end) { - CHECK_SEGMENT_AND_RETURN_STATUS(segment); - CHECK_PARAMETER_AND_RETURN_STATUS(start); - CHECK_PARAMETER_AND_RETURN_STATUS(end); + if (!nebula::meta::MetaCommon::checkSegment(segment) + || start.empty() || end.empty()) { + return Status::Error("arguments invalid!"); + } cpp2::RemoveRangeReq req; req.set_segment(std::move(segment)); req.set_start(std::move(start)); req.set_end(std::move(end)); return getResponse(std::move(req), [] (auto client, auto request) { return client->future_removeRange(request); - }, [] (cpp2::RemoveRangeResp&& resp) -> bool { + }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); -} - -std::vector MetaClient::to(const std::vector& tHosts) { - std::vector hosts; - hosts.resize(tHosts.size()); - std::transform(tHosts.begin(), tHosts.end(), hosts.begin(), [](const auto& h) { - return HostAddr(h.get_ip(), h.get_port()); - }); - return hosts; -} - -std::vector MetaClient::toSpaceIdName(const std::vector& tIdNames) { - std::vector idNames; - idNames.resize(tIdNames.size()); - std::transform(tIdNames.begin(), tIdNames.end(), idNames.begin(), [] (const auto& tin) { - return SpaceIdName(tin.id.get_space_id(), tin.name); - }); - return idNames; -} - -template -Status MetaClient::handleResponse(const RESP& resp) { - switch (resp.get_code()) { - case cpp2::ErrorCode::SUCCEEDED: - return Status::OK(); - case cpp2::ErrorCode::E_EXISTED: - return Status::Error("existed!"); - case cpp2::ErrorCode::E_NOT_FOUND: - return Status::Error("not existed!"); - case cpp2::ErrorCode::E_LEADER_CHANGED: - return Status::Error("Leader changed!"); - default: - return Status::Error("Unknown code %d", static_cast(resp.get_code())); - } -} - -PartsMap MetaClient::doGetPartsMap(const HostAddr& host, - const std::unordered_map< - GraphSpaceID, - std::shared_ptr>& localCache) { - PartsMap partMap; - for (auto it = localCache.begin(); it != localCache.end(); it++) { - auto spaceId = it->first; - auto& cache = it->second; - auto partsIt = cache->partsOnHost_.find(host); - if (partsIt != cache->partsOnHost_.end()) { - for (auto& partId : partsIt->second) { - auto partAllocIter = cache->partsAlloc_.find(partId); - CHECK(partAllocIter != cache->partsAlloc_.end()); - auto& partM = partMap[spaceId][partId]; - partM.spaceId_ = spaceId; - partM.partId_ = partId; - partM.peers_ = partAllocIter->second; - } - } - } - return partMap; + }, true); } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { @@ -562,71 +640,6 @@ int32_t MetaClient::partsNum(GraphSpaceID spaceId) { return it->second->partsAlloc_.size(); } -void MetaClient::diff(const std::unordered_map>& newCache) { - if (listener_ == nullptr) { - VLOG(3) << "Listener is null!"; - return; - } - auto localHost = listener_->getLocalHost(); - auto newPartsMap = doGetPartsMap(localHost, newCache); - auto oldPartsMap = getPartsMapFromCache(localHost); - VLOG(1) << "Let's check if any new parts added/updated for " << localHost; - for (auto it = newPartsMap.begin(); it != newPartsMap.end(); it++) { - auto spaceId = it->first; - const auto& newParts = it->second; - auto oldIt = oldPartsMap.find(spaceId); - if (oldIt == oldPartsMap.end()) { - VLOG(1) << "SpaceId " << spaceId << " was added!"; - listener_->onSpaceAdded(spaceId); - for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { - listener_->onPartAdded(partIt->second); - } - } else { - const auto& oldParts = oldIt->second; - for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { - auto oldPartIt = oldParts.find(partIt->first); - if (oldPartIt == oldParts.end()) { - VLOG(1) << "SpaceId " << spaceId << ", partId " - << partIt->first << " was added!"; - listener_->onPartAdded(partIt->second); - } else { - const auto& oldPartMeta = oldPartIt->second; - const auto& newPartMeta = partIt->second; - if (oldPartMeta != newPartMeta) { - VLOG(1) << "SpaceId " << spaceId - << ", partId " << partIt->first << " was updated!"; - listener_->onPartUpdated(newPartMeta); - } - } - } - } - } - VLOG(1) << "Let's check if any old parts removed...."; - for (auto it = oldPartsMap.begin(); it != oldPartsMap.end(); it++) { - auto spaceId = it->first; - const auto& oldParts = it->second; - auto newIt = newPartsMap.find(spaceId); - if (newIt == newPartsMap.end()) { - VLOG(1) << "SpaceId " << spaceId << " was removed!"; - for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { - listener_->onPartRemoved(spaceId, partIt->first); - } - listener_->onSpaceRemoved(spaceId); - } else { - const auto& newParts = newIt->second; - for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { - auto newPartIt = newParts.find(partIt->first); - if (newPartIt == newParts.end()) { - VLOG(1) << "SpaceId " << spaceId - << ", partId " << partIt->first << " was removed!"; - listener_->onPartRemoved(spaceId, partIt->first); - } - } - } - } -} - folly::Future> MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema) { cpp2::CreateTagReq req; @@ -638,7 +651,7 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2 return client->future_createTag(request); }, [] (cpp2::ExecResp&& resp) -> TagID { return resp.get_id().get_tag_id(); - }); + }, true); } folly::Future> MetaClient::alterTagSchema(GraphSpaceID spaceId, @@ -653,7 +666,7 @@ MetaClient::alterTagSchema(GraphSpaceID spaceId, return client->future_alterTag(request); }, [] (cpp2::ExecResp&& resp) -> TagID { return resp.get_id().get_tag_id(); - }); + }, true); } folly::Future>> @@ -676,7 +689,7 @@ MetaClient::removeTagSchema(int32_t spaceId, std::string tagName) { return client->future_removeTag(request); }, [] (cpp2::ExecResp&& resp) -> bool { return resp.code == cpp2::ErrorCode::SUCCEEDED; - }); + }, true); } folly::Future> @@ -702,7 +715,7 @@ MetaClient::createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp return client->future_createEdge(request); }, [] (cpp2::ExecResp&& resp) -> EdgeType { return resp.get_id().get_edge_type(); - }); + }, true); } folly::Future>> @@ -717,7 +730,7 @@ MetaClient::listEdgeSchemas(GraphSpaceID spaceId) { } StatusOr> -MetaClient::getTagSchemeFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver) { +MetaClient::getTagSchemaFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver) { folly::RWSpinLock::ReadHolder holder(localCacheLock_); auto spaceIt = localCache_.find(spaceId); if (spaceIt == localCache_.end()) { @@ -733,7 +746,7 @@ MetaClient::getTagSchemeFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer v } } -StatusOr> MetaClient::getEdgeSchemeFromCache( +StatusOr> MetaClient::getEdgeSchemaFromCache( GraphSpaceID spaceId, EdgeType edgeType, SchemaVer ver) { folly::RWSpinLock::ReadHolder holder(localCacheLock_); auto spaceIt = localCache_.find(spaceId); diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index dc4b7879bfe..30ce7d14922 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -70,6 +70,7 @@ class MetaClient { listener_ = listener; } + // Operations for parts /** * TODO(dangleptr): Use one struct to represent space description. * */ @@ -94,14 +95,14 @@ class MetaClient { folly::Future> getPartsAlloc(GraphSpaceID spaceId); - // TODO(Laura) : We can actively update the cache once we add the schema + // Operations for schema folly::Future> createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema); - // TODO(boshengchen) need refresh tagNameIdMap and newestTagVerMap - folly::Future> alterTagSchema(GraphSpaceID spaceId, - std::string name, - std::vector items); + folly::Future> + alterTagSchema(GraphSpaceID spaceId, + std::string name, + std::vector tagItems); folly::Future>> listTagSchemas(GraphSpaceID spaceId); @@ -112,14 +113,33 @@ class MetaClient { folly::Future> getTagSchema(int32_t spaceId, int32_t tagId, int64_t version); - // TODO(Laura) : We can actively update the cache once we add the schema folly::Future> createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema); folly::Future>> listEdgeSchemas(GraphSpaceID spaceId); - // These are the interfaces about cache opeartions. + // Operations for custom kv + folly::Future> + multiPut(std::string segment, + std::vector> pairs); + + folly::Future> + get(std::string segment, std::string key); + + folly::Future>> + multiGet(std::string segment, std::vector keys); + + folly::Future>> + scan(std::string segment, std::string start, std::string end); + + folly::Future> + remove(std::string segment, std::string key); + + folly::Future> + removeRange(std::string segment, std::string start, std::string end); + + // Opeartions for cache. StatusOr getSpaceIdByNameFromCache(const std::string& name); StatusOr getTagIDByNameFromCache(const GraphSpaceID& space, const std::string& name); @@ -144,31 +164,11 @@ class MetaClient { int32_t partsNum(GraphSpaceID spaceId); - StatusOr> getTagSchemeFromCache(GraphSpaceID spaceId, - TagID tagID, - SchemaVer ver = -1); - - StatusOr> getEdgeSchemeFromCache(GraphSpaceID spaceId, - EdgeType edgeType, - SchemaVer ver = -1); - - folly::Future> - multiPut(std::string segment, std::vector> pairs); - - folly::Future> - get(std::string segment, std::string key); - - folly::Future>> - multiGet(std::string segment, std::vector keys); + StatusOr> + getTagSchemaFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver = -1); - folly::Future>> - scan(std::string segment, std::string start, std::string end); - - folly::Future> - remove(std::string segment, std::string key); - - folly::Future> - removeRange(std::string segment, std::string start, std::string end); + StatusOr> + getEdgeSchemaFromCache(GraphSpaceID spaceId, EdgeType edgeType, SchemaVer ver = -1); protected: void loadDataThreadFunc(); @@ -182,8 +182,9 @@ class MetaClient { std::unordered_map> reverse(const PartsAlloc& parts); - void updateActiveHost() { - active_ = addrs_[folly::Random::rand64(addrs_.size())]; + void updateHost() { + folly::RWSpinLock::WriteHolder holder(hostLock_); + leader_ = active_ = addrs_[folly::Random::rand64(addrs_.size())]; } void diff(const std::unordered_map>& newCache); @@ -191,15 +192,6 @@ class MetaClient { template Status handleResponse(const RESP& resp); - template, Request) - >::type::value_type - > - Response collectResponse(Request req, RemoteFunc remoteFunc); - template folly::Future> getResponse(Request req, RemoteFunc remoteFunc, - RespGenerator respGen); + RespGenerator respGen, + bool toLeader = false); std::vector to(const std::vector& hosts); @@ -227,7 +220,10 @@ class MetaClient { std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; std::vector addrs_; + // The lock used to protect active_ and leader_. + folly::RWSpinLock hostLock_; HostAddr active_; + HostAddr leader_; thread::GenericWorker loadDataThread_; std::unordered_map> localCache_; SpaceNameIdMap spaceIndexByName_; diff --git a/src/meta/processors/MultiPutProcessor.h b/src/meta/processors/MultiPutProcessor.h index 8ccc358ebb9..c74ec3467c2 100644 --- a/src/meta/processors/MultiPutProcessor.h +++ b/src/meta/processors/MultiPutProcessor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { -class MultiPutProcessor : public BaseProcessor { +class MultiPutProcessor : public BaseProcessor { public: static MultiPutProcessor* instance(kvstore::KVStore* kvstore) { return new MultiPutProcessor(kvstore); @@ -22,7 +22,7 @@ class MultiPutProcessor : public BaseProcessor { private: explicit MultiPutProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + : BaseProcessor(kvstore) {} }; } // namespace meta diff --git a/src/meta/processors/RemoveProcessor.h b/src/meta/processors/RemoveProcessor.h index c63ca5c0e66..b4e189619f8 100644 --- a/src/meta/processors/RemoveProcessor.h +++ b/src/meta/processors/RemoveProcessor.h @@ -12,7 +12,10 @@ namespace nebula { namespace meta { -class RemoveProcessor : public BaseProcessor { +/** + * Remove some rows in custorm kv operations. + * */ +class RemoveProcessor : public BaseProcessor { public: static RemoveProcessor* instance(kvstore::KVStore* kvstore) { return new RemoveProcessor(kvstore); @@ -22,7 +25,7 @@ class RemoveProcessor : public BaseProcessor { private: explicit RemoveProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + : BaseProcessor(kvstore) {} }; } // namespace meta diff --git a/src/meta/processors/RemoveRangeProcessor.h b/src/meta/processors/RemoveRangeProcessor.h index 59a584ad91a..6da5d9cc4e6 100644 --- a/src/meta/processors/RemoveRangeProcessor.h +++ b/src/meta/processors/RemoveRangeProcessor.h @@ -12,7 +12,7 @@ namespace nebula { namespace meta { -class RemoveRangeProcessor : public BaseProcessor { +class RemoveRangeProcessor : public BaseProcessor { public: static RemoveRangeProcessor* instance(kvstore::KVStore* kvstore) { return new RemoveRangeProcessor(kvstore); @@ -22,7 +22,7 @@ class RemoveRangeProcessor : public BaseProcessor { private: explicit RemoveRangeProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + : BaseProcessor(kvstore) {} }; } // namespace meta diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 14077a714c0..6672b83a169 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -104,11 +104,11 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_NE(ret1.value().begin()->tag_id, 0); ASSERT_EQ(ret1.value().begin()->schema.columns.size(), 5); - // getTagSchemeFromCache + // getTagSchemaFromCache sleep(FLAGS_load_data_interval_second + 1); auto ver = client->getNewestTagVerFromCache(spaceId, ret1.value().begin()->tag_id); - auto ret2 = client->getTagSchemeFromCache(spaceId, + auto ret2 = client->getTagSchemaFromCache(spaceId, ret1.value().begin()->tag_id, ver); ASSERT_TRUE(ret2.ok()) << ret2.status(); ASSERT_EQ(ret2.value()->getNumFields(), 5); @@ -133,10 +133,10 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_EQ(ret1.value().size(), 1); ASSERT_NE(ret1.value().begin()->edge_type, 0); - // getEdgeSchemeFromCache + // getEdgeSchemaFromCache auto ver = client->getNewestEdgeVerFromCache(spaceId, ret1.value().begin()->edge_type); - auto ret2 = client->getEdgeSchemeFromCache(spaceId, + auto ret2 = client->getEdgeSchemaFromCache(spaceId, ret1.value().begin()->edge_type, ver); ASSERT_TRUE(ret2.ok()) << ret2.status(); ASSERT_EQ(ret2.value()->getNumFields(), 5);