Skip to content

Commit

Permalink
cascading deletion and addition
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Oct 14, 2021
1 parent 1f78224 commit 0b93e1b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 24 deletions.
57 changes: 40 additions & 17 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ bool Balancer::balanceParts(BalanceID balanceId,

auto maxPartsHost = sortedHosts.back();
auto minPartsHost = sortedHosts.front();
auto& sourceHost = maxPartsHost.first;
auto& targetHost = minPartsHost.first;
if (innerBalance_) {
LOG(INFO) << "maxPartsHost.first " << maxPartsHost.first << " minPartsHost.first "
<< minPartsHost.first;
Expand Down Expand Up @@ -471,11 +473,34 @@ bool Balancer::balanceParts(BalanceID balanceId,
}

if (dependentOnGroup) {
auto& parts = relatedParts_[minPartsHost.first];
if (!checkZoneLegal(maxPartsHost.first, minPartsHost.first) &&
std::find(parts.begin(), parts.end(), partId) != parts.end()) {
LOG(INFO) << "Zone have exist part: " << partId;
continue;
if (!checkZoneLegal(sourceHost, targetHost)) {
LOG(INFO) << "sourceHost " << sourceHost << " targetHost " << targetHost
<< " not same zone";

auto& parts = relatedParts_[targetHost];
auto minIt = std::find(parts.begin(), parts.end(), partId);
if (minIt != parts.end()) {
LOG(INFO) << "Part " << partId << " have existed";
continue;
}
}

auto& sourceNoneName = zoneParts_[sourceHost].first;
auto sourceHosts = zoneHosts_.find(sourceNoneName);
for (auto& sh : sourceHosts->second) {
auto& parts = relatedParts_[sh];
auto maxIt = std::find(parts.begin(), parts.end(), partId);
if (maxIt == parts.end()) {
LOG(INFO) << "Part " << partId << " not found on " << sh;
continue;
}
parts.erase(maxIt);
}

auto& targetNoneName = zoneParts_[targetHost].first;
auto targetHosts = zoneHosts_.find(targetNoneName);
for (auto& th : targetHosts->second) {
relatedParts_[th].emplace_back(partId);
}
}

Expand Down Expand Up @@ -733,8 +758,13 @@ std::vector<std::pair<HostAddr, int32_t>> Balancer::sortedHostsByParts(const Hos
LOG(INFO) << "Host " << it->first << " parts " << it->second.size();
hosts.emplace_back(it->first, it->second.size());
}
std::sort(
hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { return l.second < r.second; });
std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) {
if (l.second != r.second) {
return l.second < r.second;
} else {
return l.first.host < r.first.host;
}
});
return hosts;
}

Expand Down Expand Up @@ -784,8 +814,7 @@ ErrorOr<nebula::cpp2::ErrorCode, HostAddr> Balancer::hostWithMinimalPartsForZone
}

LOG(INFO) << "source " << source << " h.first " << h.first;
if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end() &&
checkZoneLegal(source, h.first)) {
if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) {
return h.first;
}
}
Expand Down Expand Up @@ -1196,14 +1225,8 @@ bool Balancer::checkZoneLegal(const HostAddr& source, const HostAddr& target) {
return false;
}

if (!innerBalance_) {
LOG(INFO) << "innerBalance_ is false";
return true;
} else {
LOG(INFO) << "same zone";
LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first;
return sourceIter->second.first == targetIter->second.first;
}
LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first;
return sourceIter->second.first == targetIter->second.first;
}

} // namespace meta
Expand Down
22 changes: 15 additions & 7 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ TEST(BalanceTest, SimpleTestWithZone) {
{
HostParts hostParts;
hostParts.emplace(HostAddr("0", 0), std::vector<PartitionID>{1, 2, 3, 4});
hostParts.emplace(HostAddr("1", 0), std::vector<PartitionID>{1, 2, 3, 4});
hostParts.emplace(HostAddr("2", 0), std::vector<PartitionID>{1, 2, 3, 4});
hostParts.emplace(HostAddr("3", 0), std::vector<PartitionID>{});
hostParts.emplace(HostAddr("1", 1), std::vector<PartitionID>{1, 2, 3, 4});
hostParts.emplace(HostAddr("2", 2), std::vector<PartitionID>{1, 2, 3, 4});
hostParts.emplace(HostAddr("3", 3), std::vector<PartitionID>{});
int32_t totalParts = 12;
std::vector<BalanceTask> tasks;
NiceMock<MockAdminClient> client;
Balancer balancer(kv, &client);
auto code = balancer.assembleZoneParts("group_0", hostParts);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true);
for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
EXPECT_EQ(3, it->second.size());
Expand Down Expand Up @@ -244,7 +246,8 @@ TEST(BalanceTest, ExpansionZoneTest) {
{
HostParts hostParts;
int32_t totalParts = 0;
balancer.getHostParts(1, true, hostParts, totalParts);
auto result = balancer.getHostParts(1, true, hostParts, totalParts);
ASSERT_TRUE(nebula::ok(result));
std::vector<BalanceTask> tasks;
hostParts.emplace(HostAddr("3", 3), std::vector<PartitionID>{});
balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true);
Expand All @@ -262,7 +265,7 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) {
FLAGS_heartbeat_interval_secs = 1;
{
std::vector<HostAddr> hosts;
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 6; i++) {
hosts.emplace_back(std::to_string(i), i);
}
TestUtils::createSomeHosts(kv, hosts);
Expand Down Expand Up @@ -311,7 +314,8 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) {
{
HostParts hostParts;
int32_t totalParts = 0;
balancer.getHostParts(1, true, hostParts, totalParts);
auto result = balancer.getHostParts(1, true, hostParts, totalParts);
ASSERT_TRUE(nebula::ok(result));

std::vector<BalanceTask> tasks;
hostParts.emplace(HostAddr("3", 3), std::vector<PartitionID>{});
Expand Down Expand Up @@ -538,6 +542,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) {
int32_t totalParts = 64 * 3;
std::vector<BalanceTask> tasks;
auto hostParts = assignHostParts(kv, 2);
auto code = balancer.assembleZoneParts("group_0", hostParts);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
balancer.balanceParts(0, 2, hostParts, totalParts, tasks, true);
}
{
Expand All @@ -557,7 +563,7 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) {

HostParts hostParts;
std::vector<PartitionID> parts;
for (int32_t i = 0; i < 81; i++) {
for (int32_t i = 1; i <= 81; i++) {
parts.emplace_back(i);
}

Expand All @@ -573,6 +579,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) {
int32_t totalParts = 243;
std::vector<BalanceTask> tasks;
dump(hostParts, tasks);
auto code = balancer.assembleZoneParts("group_1", hostParts);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
balancer.balanceParts(0, 3, hostParts, totalParts, tasks, true);

LOG(INFO) << "=== new map ====";
Expand Down

0 comments on commit 0b93e1b

Please sign in to comment.