From 0b93e1beae1eedc97fa4620a349de59debb3f17b Mon Sep 17 00:00:00 2001 From: "darion.yaphet" Date: Thu, 14 Oct 2021 07:00:47 +0800 Subject: [PATCH] cascading deletion and addition --- src/meta/processors/admin/Balancer.cpp | 57 ++++++++++++++++++-------- src/meta/test/BalancerTest.cpp | 22 ++++++---- 2 files changed, 55 insertions(+), 24 deletions(-) diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index 0764919cd31..90e5b04d5ee 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -398,6 +398,8 @@ bool Balancer::balanceParts(BalanceID balanceId, auto maxPartsHost = sortedHosts.back(); auto minPartsHost = sortedHosts.front(); + auto& sourceHost = maxPartsHost.first; + auto& targetHost = minPartsHost.first; if (innerBalance_) { LOG(INFO) << "maxPartsHost.first " << maxPartsHost.first << " minPartsHost.first " << minPartsHost.first; @@ -471,11 +473,34 @@ bool Balancer::balanceParts(BalanceID balanceId, } if (dependentOnGroup) { - auto& parts = relatedParts_[minPartsHost.first]; - if (!checkZoneLegal(maxPartsHost.first, minPartsHost.first) && - std::find(parts.begin(), parts.end(), partId) != parts.end()) { - LOG(INFO) << "Zone have exist part: " << partId; - continue; + if (!checkZoneLegal(sourceHost, targetHost)) { + LOG(INFO) << "sourceHost " << sourceHost << " targetHost " << targetHost + << " not same zone"; + + auto& parts = relatedParts_[targetHost]; + auto minIt = std::find(parts.begin(), parts.end(), partId); + if (minIt != parts.end()) { + LOG(INFO) << "Part " << partId << " have existed"; + continue; + } + } + + auto& sourceNoneName = zoneParts_[sourceHost].first; + auto sourceHosts = zoneHosts_.find(sourceNoneName); + for (auto& sh : sourceHosts->second) { + auto& parts = relatedParts_[sh]; + auto maxIt = std::find(parts.begin(), parts.end(), partId); + if (maxIt == parts.end()) { + LOG(INFO) << "Part " << partId << " not found on " << sh; + continue; + } + parts.erase(maxIt); + } + + auto& targetNoneName = zoneParts_[targetHost].first; + auto targetHosts = zoneHosts_.find(targetNoneName); + for (auto& th : targetHosts->second) { + relatedParts_[th].emplace_back(partId); } } @@ -733,8 +758,13 @@ std::vector> Balancer::sortedHostsByParts(const Hos LOG(INFO) << "Host " << it->first << " parts " << it->second.size(); hosts.emplace_back(it->first, it->second.size()); } - std::sort( - hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { return l.second < r.second; }); + std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { + if (l.second != r.second) { + return l.second < r.second; + } else { + return l.first.host < r.first.host; + } + }); return hosts; } @@ -784,8 +814,7 @@ ErrorOr Balancer::hostWithMinimalPartsForZone } LOG(INFO) << "source " << source << " h.first " << h.first; - if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end() && - checkZoneLegal(source, h.first)) { + if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { return h.first; } } @@ -1196,14 +1225,8 @@ bool Balancer::checkZoneLegal(const HostAddr& source, const HostAddr& target) { return false; } - if (!innerBalance_) { - LOG(INFO) << "innerBalance_ is false"; - return true; - } else { - LOG(INFO) << "same zone"; - LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first; - return sourceIter->second.first == targetIter->second.first; - } + LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first; + return sourceIter->second.first == targetIter->second.first; } } // namespace meta diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 528e35288a5..7f6b5fbece2 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -172,13 +172,15 @@ TEST(BalanceTest, SimpleTestWithZone) { { HostParts hostParts; hostParts.emplace(HostAddr("0", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("1", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("2", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("3", 0), std::vector{}); + hostParts.emplace(HostAddr("1", 1), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr("2", 2), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr("3", 3), std::vector{}); int32_t totalParts = 12; std::vector tasks; NiceMock client; Balancer balancer(kv, &client); + auto code = balancer.assembleZoneParts("group_0", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true); for (auto it = hostParts.begin(); it != hostParts.end(); it++) { EXPECT_EQ(3, it->second.size()); @@ -244,7 +246,8 @@ TEST(BalanceTest, ExpansionZoneTest) { { HostParts hostParts; int32_t totalParts = 0; - balancer.getHostParts(1, true, hostParts, totalParts); + auto result = balancer.getHostParts(1, true, hostParts, totalParts); + ASSERT_TRUE(nebula::ok(result)); std::vector tasks; hostParts.emplace(HostAddr("3", 3), std::vector{}); balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true); @@ -262,7 +265,7 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) { FLAGS_heartbeat_interval_secs = 1; { std::vector hosts; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 6; i++) { hosts.emplace_back(std::to_string(i), i); } TestUtils::createSomeHosts(kv, hosts); @@ -311,7 +314,8 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) { { HostParts hostParts; int32_t totalParts = 0; - balancer.getHostParts(1, true, hostParts, totalParts); + auto result = balancer.getHostParts(1, true, hostParts, totalParts); + ASSERT_TRUE(nebula::ok(result)); std::vector tasks; hostParts.emplace(HostAddr("3", 3), std::vector{}); @@ -538,6 +542,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { int32_t totalParts = 64 * 3; std::vector tasks; auto hostParts = assignHostParts(kv, 2); + auto code = balancer.assembleZoneParts("group_0", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 2, hostParts, totalParts, tasks, true); } { @@ -557,7 +563,7 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { HostParts hostParts; std::vector parts; - for (int32_t i = 0; i < 81; i++) { + for (int32_t i = 1; i <= 81; i++) { parts.emplace_back(i); } @@ -573,6 +579,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { int32_t totalParts = 243; std::vector tasks; dump(hostParts, tasks); + auto code = balancer.assembleZoneParts("group_1", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 3, hostParts, totalParts, tasks, true); LOG(INFO) << "=== new map ====";