Skip to content

Commit

Permalink
Balancefix (vesoft-inc#708)
Browse files Browse the repository at this point in the history
* return success when already balanced

* mark job as failed when some tasks are invalid

Co-authored-by: liwenhui-soul <[email protected]>
  • Loading branch information
nebula-bots and liwenhui-soul authored Mar 21, 2022
1 parent 7e5a30b commit 5587e75
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/meta/processors/job/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void BalancePlan::invoke() {
} else if (j + 1 < buckets_[i].size()) {
auto& task = tasks_[buckets_[i][j + 1]];
LOG(INFO) << "Skip the task for the same partId " << task.partId_;
task.ret_ = BalanceTaskResult::FAILED;
if (stopped) {
task.ret_ = BalanceTaskResult::INVALID;
} else {
task.ret_ = BalanceTaskResult::FAILED;
}
task.invoke();
} else {
size_t index = curIndex_.fetch_add(1, std::memory_order_relaxed);
Expand Down
5 changes: 2 additions & 3 deletions src/meta/processors/job/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ void BalanceTask::invoke() {
LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Task invalid, status "
<< static_cast<int32_t>(status_);
// When a plan is stopped or dst is not alive any more, a task will be
// marked as INVALID, the task will not be executed again. Balancer will
// start a new plan instead.
onFinished_();
// marked as INVALID, and the job will be marked as FAILED.
onError_();
return;
} else if (ret_ == BalanceTaskResult::FAILED) {
endTimeMs_ = time::WallClock::fastNowInSec();
Expand Down
8 changes: 7 additions & 1 deletion src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <folly/executors/CPUThreadPoolExecutor.h>

#include <memory>

#include "common/utils/MetaKeyUtils.h"
#include "kvstore/NebulaStore.h"
#include "meta/processors/job/JobUtils.h"
Expand All @@ -18,6 +20,10 @@ folly::Future<Status> DataBalanceJobExecutor::executeInternal() {
if (plan_ == nullptr) {
Status status = buildBalancePlan();
if (status != Status::OK()) {
if (status == Status::Balanced()) {
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
}
return status;
}
}
Expand Down Expand Up @@ -175,7 +181,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
if (empty) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
plan_ = std::make_unique<BalancePlan>(jobDescription_, kvstore_, adminClient_);
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
Expand Down
23 changes: 16 additions & 7 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <folly/executors/CPUThreadPoolExecutor.h>

#include <memory>

#include "common/utils/MetaKeyUtils.h"
#include "kvstore/NebulaStore.h"
#include "meta/processors/job/JobUtils.h"
Expand Down Expand Up @@ -41,6 +43,10 @@ folly::Future<Status> ZoneBalanceJobExecutor::executeInternal() {
if (plan_ == nullptr) {
Status status = buildBalancePlan();
if (status != Status::OK()) {
if (status == Status::Balanced()) {
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
}
return status;
}
}
Expand Down Expand Up @@ -307,16 +313,19 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
// all parts of lost zones have moved to active zones, then rebalance the active zones
nebula::cpp2::ErrorCode rc =
rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error(apache::thrift::util::enumNameSafe(rc));
}

bool empty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (empty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
plan_ = std::make_unique<BalancePlan>(jobDescription_, kvstore_, adminClient_);
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
Expand Down
11 changes: 8 additions & 3 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,16 @@ TEST(BalanceTest, NormalZoneTest) {
JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::ZONE_BALANCE);
ZoneBalanceJobExecutor balancer(jd, kv, &client, {});
balancer.spaceInfo_.loadInfo(1, kv);
folly::Baton<true, std::atomic> baton;
balancer.setFinishCallBack([&](meta::cpp2::JobStatus) {
baton.post();
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
auto ret = balancer.executeInternal();
EXPECT_EQ(Status::Balanced(), ret.value());
EXPECT_EQ(Status::OK(), ret.value());
balancer.finish();
balancer.lostZones_ = {"5", "6", "7", "8"};
folly::Baton<true, std::atomic> baton;
baton.reset();
balancer.setFinishCallBack([&](meta::cpp2::JobStatus) {
baton.post();
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -794,7 +799,7 @@ TEST(BalanceTest, NormalDataTest) {
DataBalanceJobExecutor balancer(jd, kv, &client, {});
balancer.spaceInfo_.loadInfo(1, kv);
auto ret = balancer.executeInternal();
EXPECT_EQ(Status::Balanced(), ret.value());
EXPECT_EQ(Status::OK(), ret.value());
balancer.finish();
balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}};
folly::Baton<true, std::atomic> baton;
Expand Down

0 comments on commit 5587e75

Please sign in to comment.