diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 419fb9c0cba..4ecfe498f40 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -2414,6 +2414,21 @@ folly::Future> MetaClient::heartbeat() { } else { req.set_leader_partIds(std::move(leaderIds)); } + + kvstore::SpaceDiskPartsMap diskParts; + if (listener_ != nullptr) { + listener_->fetchDiskParts(diskParts); + if (diskParts_ != diskParts) { + { + folly::RWSpinLock::WriteHolder holder(&diskPartsLock_); + diskParts_.clear(); + diskParts_ = diskParts; + } + req.set_disk_parts(diskParts); + } + } else { + req.set_disk_parts(diskParts); + } } folly::Promise> promise; diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 0bb794c6178..fab77aa4381 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -27,6 +27,7 @@ #include "interface/gen-cpp2/MetaServiceAsyncClient.h" #include "interface/gen-cpp2/common_types.h" #include "interface/gen-cpp2/meta_types.h" +#include "kvstore/DiskManager.h" DECLARE_int32(meta_client_retry_times); DECLARE_int32(heartbeat_interval_secs); @@ -162,6 +163,7 @@ class MetaChangedListener { virtual void onPartUpdated(const PartHosts& partHosts) = 0; virtual void fetchLeaderInfo( std::unordered_map>& leaders) = 0; + virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0; virtual void onListenerAdded(GraphSpaceID spaceId, PartitionID partId, const ListenerHosts& listenerHosts) = 0; @@ -718,9 +720,14 @@ class MetaClient { std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; + // heartbeat is a single thread, maybe leaderIdsLock_ and diskPartsLock_ is useless? // leaderIdsLock_ is used to protect leaderIds_ std::unordered_map> leaderIds_; folly::RWSpinLock leaderIdsLock_; + // diskPartsLock_ is used to protect diskParts_; + kvstore::SpaceDiskPartsMap diskParts_; + folly::RWSpinLock diskPartsLock_; + std::atomic localDataLastUpdateTime_{-1}; std::atomic localCfgLastUpdateTime_{-1}; std::atomic metadLastUpdateTime_{0}; diff --git a/src/common/utils/MetaKeyUtils.cpp b/src/common/utils/MetaKeyUtils.cpp index 3018051273d..b070cf40731 100644 --- a/src/common/utils/MetaKeyUtils.cpp +++ b/src/common/utils/MetaKeyUtils.cpp @@ -51,7 +51,8 @@ static const std::unordered_map< {"balance_task", {"__balance_task__", nullptr}}, {"balance_plan", {"__balance_plan__", nullptr}}, {"ft_index", {"__ft_index__", nullptr}}, - {"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}}}; + {"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}}, + {"disk_parts", {"__disk_parts__", MetaKeyUtils::parseDiskPartsSpace}}}; // clang-format off static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT @@ -71,6 +72,7 @@ static const std::string kLeaderTermsTable = tableMaps.at("leader_terms").fir static const std::string kGroupsTable = systemTableMaps.at("groups").first; // NOLINT static const std::string kZonesTable = systemTableMaps.at("zones").first; // NOLINT static const std::string kListenerTable = tableMaps.at("listener").first; // NOLINT +static const std::string kDiskPartsTable = tableMaps.at("disk_parts").first; // NOLINT // Used to record the number of vertices and edges in the space // The number of vertices of each tag in the space @@ -1150,4 +1152,48 @@ GraphSpaceID MetaKeyUtils::parseLocalIdSpace(folly::StringPiece rawData) { return *reinterpret_cast(rawData.data() + offset); } +GraphSpaceID MetaKeyUtils::parseDiskPartsSpace(folly::StringPiece rawData) { + auto offset = kDiskPartsTable.size(); + return *reinterpret_cast(rawData.data() + offset); +} + +std::string MetaKeyUtils::diskPartsPrefix() { return kDiskPartsTable; } + +std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr) { + std::string key; + std::string hostStr = serializeHostAddr(addr); + key.reserve(kDiskPartsTable.size() + hostStr.size()); + key.append(kDiskPartsTable.data(), kDiskPartsTable.size()).append(hostStr.data(), hostStr.size()); + return key; +} + +std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId) { + std::string key; + std::string prefix = diskPartsPrefix(addr); + key.reserve(prefix.size() + sizeof(GraphSpaceID)); + key.append(prefix.data(), prefix.size()) + .append(reinterpret_cast(&spaceId), sizeof(GraphSpaceID)); + return key; +} + +std::string MetaKeyUtils::diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path) { + std::string key; + std::string prefix = diskPartsPrefix(addr, spaceId); + key.reserve(prefix.size() + path.size()); + key.append(prefix.data(), prefix.size()).append(path.data(), path.size()); + return key; +} + +std::string MetaKeyUtils::diskPartsVal(const meta::cpp2::PartitionList& partList) { + std::string val; + apache::thrift::CompactSerializer::serialize(partList, &val); + return val; +} + +meta::cpp2::PartitionList MetaKeyUtils::parseDiskPartsVal(const folly::StringPiece& rawData) { + meta::cpp2::PartitionList partList; + apache::thrift::CompactSerializer::deserialize(rawData, partList); + return partList; +} + } // namespace nebula diff --git a/src/common/utils/MetaKeyUtils.h b/src/common/utils/MetaKeyUtils.h index e01eb2d67fc..29fa715771b 100644 --- a/src/common/utils/MetaKeyUtils.h +++ b/src/common/utils/MetaKeyUtils.h @@ -375,6 +375,20 @@ class MetaKeyUtils final { static std::unordered_map> getSystemInfoMaps(); static std::unordered_map> getSystemTableMaps(); + + static GraphSpaceID parseDiskPartsSpace(folly::StringPiece rawData); + + static std::string diskPartsPrefix(); + + static std::string diskPartsPrefix(HostAddr addr); + + static std::string diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId); + + static std::string diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path); + + static std::string diskPartsVal(const meta::cpp2::PartitionList& partList); + + static meta::cpp2::PartitionList parseDiskPartsVal(const folly::StringPiece& rawData); }; } // namespace nebula diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 50afb2462fc..670d0337668 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -554,6 +554,10 @@ struct LeaderInfo { 2: i64 term } +struct PartitionList { + 1: list part_list; +} + struct HBReq { 1: HostRole role, 2: common.HostAddr host, @@ -563,6 +567,9 @@ struct HBReq { 5: binary git_info_sha, // version of binary 6: optional binary version, + 7: optional map + (cpp.template = "std::unordered_map")> + (cpp.template = "std::unordered_map") disk_parts; } struct IndexFieldDef { diff --git a/src/kvstore/DiskManager.cpp b/src/kvstore/DiskManager.cpp index 48f77aea21a..8070b64f51d 100644 --- a/src/kvstore/DiskManager.cpp +++ b/src/kvstore/DiskManager.cpp @@ -94,12 +94,21 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId, } } -StatusOr DiskManager::partDist(GraphSpaceID spaceId) { - auto spaceIt = partPath_.find(spaceId); - if (spaceIt == partPath_.end()) { - return Status::Error("Space not found"); +void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) { + std::lock_guard lg(lock_); + for (const auto& [space, partDiskMap] : partPath_) { + std::unordered_map tmpPartPaths; + for (const auto& [path, partitions] : partDiskMap) { + std::vector tmpPartitions; + for (const auto& partition : partitions) { + tmpPartitions.emplace_back(partition); + } + meta::cpp2::PartitionList ps; + ps.set_part_list(tmpPartitions); + tmpPartPaths[path] = ps; + } + diskParts.emplace(space, std::move(tmpPartPaths)); } - return spaceIt->second; } bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) { diff --git a/src/kvstore/DiskManager.h b/src/kvstore/DiskManager.h index 4c0afe3c9a1..b5706dda07a 100644 --- a/src/kvstore/DiskManager.h +++ b/src/kvstore/DiskManager.h @@ -15,11 +15,14 @@ #include "common/base/StatusOr.h" #include "common/thread/GenericWorker.h" #include "common/thrift/ThriftTypes.h" +#include "interface/gen-cpp2/meta_types.h" namespace nebula { namespace kvstore { using PartDiskMap = std::unordered_map>; +using SpaceDiskPartsMap = + std::unordered_map>; class DiskManager { FRIEND_TEST(DiskManagerTest, AvailableTest); @@ -52,6 +55,9 @@ class DiskManager { // Given a space, return data path and all partition in the path StatusOr partDist(GraphSpaceID spaceId); + // Get all space data path and all partition in the path + void getDiskParts(SpaceDiskPartsMap& diskParts); + private: // refresh free bytes of data path periodically void refresh(); diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 82916c9004c..6d6c841828c 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -539,6 +539,10 @@ void NebulaStore::checkRemoteListeners(GraphSpaceID spaceId, } } +void NebulaStore::fetchDiskParts(SpaceDiskPartsMap& diskParts) { + diskMan_->getDiskParts(diskParts); +} + void NebulaStore::updateSpaceOption(GraphSpaceID spaceId, const std::unordered_map& options, bool isDbOption) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 78603ae0994..384be9bd4b8 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -272,6 +272,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::vector& remoteListeners) override; + void fetchDiskParts(SpaceDiskPartsMap& diskParts) override; + nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId, std::vector keyValues) override; diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 7808241a060..0800ebc23ab 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -190,6 +190,14 @@ void MetaServerBasedPartManager::fetchLeaderInfo( } } +void MetaServerBasedPartManager::fetchDiskParts(SpaceDiskPartsMap& diskParts) { + if (handler_ != nullptr) { + handler_->fetchDiskParts(diskParts); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + meta::ListenersMap MetaServerBasedPartManager::listeners(const HostAddr& host) { auto ret = client_->getListenersByHostFromCache(host); if (ret.ok()) { diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 83956cdc04f..ec8efd8c42c 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -11,6 +11,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/Base.h" #include "common/meta/Common.h" +#include "kvstore/DiskManager.h" namespace nebula { namespace kvstore { @@ -34,9 +35,6 @@ class Handler { virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; - virtual int32_t allLeader( - std::unordered_map>& leaderIds) = 0; - virtual void addListener(GraphSpaceID spaceId, PartitionID partId, meta::cpp2::ListenerType type, @@ -49,6 +47,12 @@ class Handler { virtual void checkRemoteListeners(GraphSpaceID spaceId, PartitionID partId, const std::vector& remoteListeners) = 0; + + // get infos from handler(nebula store) to listener(meta_client -> meta) + virtual int32_t allLeader( + std::unordered_map>& leaderIds) = 0; + + virtual void fetchDiskParts(SpaceDiskPartsMap& diskParts) = 0; }; /** @@ -210,6 +214,8 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL void fetchLeaderInfo( std::unordered_map>& leaderParts) override; + void fetchDiskParts(SpaceDiskPartsMap& diskParts) override; + void onListenerAdded(GraphSpaceID spaceId, PartitionID partId, const meta::ListenerHosts& listenerHosts) override; diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 6abdb2eff1b..beebe9b4810 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -43,6 +43,27 @@ void HBProcessor::process(const cpp2::HBReq& req) { onFinished(); return; } + + if (req.disk_parts_ref().has_value()) { + for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) { + for (const auto& [path, partList] : partDiskMap) { + auto partListVal = MetaKeyUtils::diskPartsVal(partList); + std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path); + std::vector data; + data.emplace_back(key, partListVal); + // doPut() not work, will trigger the asan: use heap memory which is free + folly::Baton baton; + kvstore_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, &baton](nebula::cpp2::ErrorCode code) { + this->handleErrorCode(code); + baton.post(); + }); + baton.wait(); + } + } + } } HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha()); diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 1a8c6a9318f..c7b3aa93de2 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -1361,6 +1361,11 @@ class TestListener : public MetaChangedListener { UNUSED(remoteListeners); } + void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) override { + UNUSED(diskParts); + LOG(INFO) << "Fetch Disk Paths"; + } + int32_t spaceNum = 0; int32_t partNum = 0; int32_t partChanged = 0;