From 7b4b693714725b9c1c1863d10e59efcafe69222d Mon Sep 17 00:00:00 2001 From: "darion.yaphet" Date: Wed, 29 Dec 2021 22:21:31 +0800 Subject: [PATCH] fix create space --- src/meta/processors/admin/HBProcessor.cpp | 2 +- .../processors/parts/CreateSpaceProcessor.cpp | 57 ++++++------------- .../processors/parts/CreateSpaceProcessor.h | 3 - 3 files changed, 17 insertions(+), 45 deletions(-) diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 7e78aba9732..83c2984f513 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -58,7 +58,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) { for (const auto& [path, partList] : partDiskMap) { auto partListVal = MetaKeyUtils::diskPartsVal(partList); - std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path); + auto key = MetaKeyUtils::diskPartsKey(host, spaceId, path); std::vector data; data.emplace_back(key, partListVal); // doPut() not work, will trigger the asan: use heap memory which is free diff --git a/src/meta/processors/parts/CreateSpaceProcessor.cpp b/src/meta/processors/parts/CreateSpaceProcessor.cpp index 8da74f9e0a3..72758ea000a 100644 --- a/src/meta/processors/parts/CreateSpaceProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceProcessor.cpp @@ -9,6 +9,7 @@ DEFINE_int32(default_parts_num, 100, "The default number of parts when a space is created"); DEFINE_int32(default_replica_factor, 1, "The default replica factor when a space is created"); +DECLARE_uint32(expired_time_factor); namespace nebula { namespace meta { @@ -174,19 +175,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { return; } - auto hostLoadingRet = getHostLoading(); - if (!nebula::ok(hostLoadingRet)) { - LOG(ERROR) << "Get host loading failed."; - auto retCode = nebula::error(hostLoadingRet); - if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; - } - handleErrorCode(retCode); - onFinished(); - return; - } - - hostLoading_ = std::move(nebula::value(hostLoadingRet)); std::unordered_map zoneHosts; for (auto& zone : zones) { auto zoneKey = MetaKeyUtils::zoneKey(zone); @@ -200,14 +188,23 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { break; } + auto now = time::WallClock::fastNowInMilliSec(); auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet))); for (auto& host : hosts) { - auto hostIter = hostLoading_.find(host); - if (hostIter == hostLoading_.end()) { - hostLoading_[host] = 0; - zoneLoading_[zone] += 0; + auto key = MetaKeyUtils::hostKey(host.host, host.port); + auto ret = doGet(key); + HostInfo info = HostInfo::decode(nebula::value(ret)); + if (now - info.lastHBTimeInMilliSec_ < + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) { + auto hostIter = hostLoading_.find(host); + if (hostIter == hostLoading_.end()) { + hostLoading_[host] = 0; + zoneLoading_[zone] += 0; + } else { + zoneLoading_[zone] += hostIter->second; + } } else { - zoneLoading_[zone] += hostIter->second; + LOG(WARNING) << "Host " << host << " expired"; } } zoneHosts[zone] = std::move(hosts); @@ -248,7 +245,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { ss << host << ", "; } - VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str(); + LOG(INFO) << "Space " << spaceId << " part " << partId << " hosts " << ss.str(); data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts)); } @@ -264,28 +261,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { LOG(INFO) << "Create space " << spaceName; } -ErrorOr> -CreateSpaceProcessor::getHostLoading() { - const auto& prefix = MetaKeyUtils::partPrefix(); - auto iterRet = doPrefix(prefix); - - if (!nebula::ok(iterRet)) { - LOG(ERROR) << "Prefix Parts Failed"; - return nebula::error(iterRet); - } - - std::unordered_map result; - auto iter = nebula::value(iterRet).get(); - while (iter->valid()) { - auto hosts = MetaKeyUtils::parsePartVal(iter->val()); - for (auto& host : hosts) { - result[host]++; - } - iter->next(); - } - return result; -} - StatusOr CreateSpaceProcessor::pickHostsWithZone( const std::vector& zones, const std::unordered_map& zoneHosts) { diff --git a/src/meta/processors/parts/CreateSpaceProcessor.h b/src/meta/processors/parts/CreateSpaceProcessor.h index 570ab11cd74..67768039dbb 100644 --- a/src/meta/processors/parts/CreateSpaceProcessor.h +++ b/src/meta/processors/parts/CreateSpaceProcessor.h @@ -29,9 +29,6 @@ class CreateSpaceProcessor : public BaseProcessor { StatusOr pickHostsWithZone(const std::vector& zones, const std::unordered_map& zoneHosts); - // Get all host's part loading - ErrorOr> getHostLoading(); - // Get the zones with the least load StatusOr> pickLightLoadZones(int32_t replicaFactor);