diff --git a/src/meta/processors/job/BalancePlan.cpp b/src/meta/processors/job/BalancePlan.cpp index 0bf77e5bb9a..b0f80797eb4 100644 --- a/src/meta/processors/job/BalancePlan.cpp +++ b/src/meta/processors/job/BalancePlan.cpp @@ -104,7 +104,11 @@ void BalancePlan::invoke() { } else if (j + 1 < buckets_[i].size()) { auto& task = tasks_[buckets_[i][j + 1]]; LOG(INFO) << "Skip the task for the same partId " << task.partId_; - task.ret_ = BalanceTaskResult::FAILED; + if (stopped) { + task.ret_ = BalanceTaskResult::INVALID; + } else { + task.ret_ = BalanceTaskResult::FAILED; + } task.invoke(); } else { size_t index = curIndex_.fetch_add(1, std::memory_order_relaxed); diff --git a/src/meta/processors/job/BalanceTask.cpp b/src/meta/processors/job/BalanceTask.cpp index 8bd9c56941d..51ffa49c9d3 100644 --- a/src/meta/processors/job/BalanceTask.cpp +++ b/src/meta/processors/job/BalanceTask.cpp @@ -29,9 +29,8 @@ void BalanceTask::invoke() { LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Task invalid, status " << static_cast(status_); // When a plan is stopped or dst is not alive any more, a task will be - // marked as INVALID, the task will not be executed again. Balancer will - // start a new plan instead. - onFinished_(); + // marked as INVALID, and the job will be marked as FAILED. + onError_(); return; } else if (ret_ == BalanceTaskResult::FAILED) { endTimeMs_ = time::WallClock::fastNowInSec(); diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index 88f98d2a99a..4a54e9694e8 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -7,6 +7,8 @@ #include +#include + #include "common/utils/MetaKeyUtils.h" #include "kvstore/NebulaStore.h" #include "meta/processors/job/JobUtils.h" @@ -18,6 +20,10 @@ folly::Future DataBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { + if (status == Status::Balanced()) { + executorOnFinished_(meta::cpp2::JobStatus::FINISHED); + return Status::OK(); + } return status; } } @@ -175,7 +181,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() { if (empty) { return Status::Balanced(); } - plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + plan_ = std::make_unique(jobDescription_, kvstore_, adminClient_); std::for_each(existTasks.begin(), existTasks.end(), [this](std::pair>& p) { diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index af8b3c56390..277a3a662d7 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -7,6 +7,8 @@ #include +#include + #include "common/utils/MetaKeyUtils.h" #include "kvstore/NebulaStore.h" #include "meta/processors/job/JobUtils.h" @@ -41,6 +43,10 @@ folly::Future ZoneBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { + if (status == Status::Balanced()) { + executorOnFinished_(meta::cpp2::JobStatus::FINISHED); + return Status::OK(); + } return status; } } @@ -307,16 +313,19 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { // all parts of lost zones have moved to active zones, then rebalance the active zones nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return Status::Error(apache::thrift::util::enumNameSafe(rc)); + } - bool empty = std::find_if(existTasks.begin(), - existTasks.end(), - [](std::pair>& p) { - return !p.second.empty(); - }) == existTasks.end(); - if (empty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + bool emty = std::find_if(existTasks.begin(), + existTasks.end(), + [](std::pair>& p) { + return !p.second.empty(); + }) == existTasks.end(); + if (emty) { return Status::Balanced(); } - plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + plan_ = std::make_unique(jobDescription_, kvstore_, adminClient_); std::for_each(existTasks.begin(), existTasks.end(), [this](std::pair>& p) { diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 0221a639543..280f7d4bb5b 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -762,11 +762,16 @@ TEST(BalanceTest, NormalZoneTest) { JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::ZONE_BALANCE); ZoneBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(1, kv); + folly::Baton baton; + balancer.setFinishCallBack([&](meta::cpp2::JobStatus) { + baton.post(); + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::Balanced(), ret.value()); + EXPECT_EQ(Status::OK(), ret.value()); balancer.finish(); balancer.lostZones_ = {"5", "6", "7", "8"}; - folly::Baton baton; + baton.reset(); balancer.setFinishCallBack([&](meta::cpp2::JobStatus) { baton.post(); return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -794,7 +799,7 @@ TEST(BalanceTest, NormalDataTest) { DataBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(1, kv); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::Balanced(), ret.value()); + EXPECT_EQ(Status::OK(), ret.value()); balancer.finish(); balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}}; folly::Baton baton;