Skip to content

Commit

Permalink
make meta job use the errorcode
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Jul 20, 2022
1 parent 097d6a5 commit d226aae
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 38 deletions.
26 changes: 26 additions & 0 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE ||
jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) {
// The job which executed on meta, aka balance data, is a litte different from others. In order
// to be consistent with other jobs, the task result is set in paras in JobDesc serialized by
// thrift. The reason that we can't use the list of TaskDesc is that the state of balance task
// is saved in BalanceTask, which is different from TaskDesc.
nebula::DataSet v({"Job Id(spaceId:partId)",
"Command(src->dst)",
"Status",
Expand Down Expand Up @@ -167,8 +171,24 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
convertJobTimestampToDateTime(jd.get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code()),
}));
// As for normal type of job, there will only be one of the three state: running, succeeded or
// failed.
uint32_t total = td.size(), succeeded = 0, failed = 0, inProgress = 0;
// tasks desc
for (const auto &taskDesc : td) {
switch (taskDesc.get_status()) {
case meta::cpp2::JobStatus::RUNNING:
++inProgress;
break;
case meta::cpp2::JobStatus::FAILED:
++failed;
break;
case meta::cpp2::JobStatus::FINISHED:
++succeeded;
break;
default:
break;
}
v.emplace_back(nebula::Row({
taskDesc.get_task_id(),
taskDesc.get_host().host,
Expand All @@ -178,6 +198,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(taskDesc.get_code()),
}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
folly::sformat("Succeeded:{}", succeeded),
folly::sformat("Failed:{}", failed),
folly::sformat("In Progress:{}", inProgress),
(""),
("")}));
return v;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/test/JobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ TEST_F(JobTest, JobFinishTime) {
auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 2);
// One line for job, one line for the task, one line for the count
EXPECT_EQ(result.rows.size(), 3);
EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123)));
EXPECT_EQ(result.rows[0][4], Value::kEmpty);
EXPECT_EQ(result.rows[1][3], Value(time::TimeConversion::unixSecondsToDateTime(456)));
Expand Down
8 changes: 4 additions & 4 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
namespace nebula {
namespace meta {

folly::Future<Status> DataBalanceJobExecutor::executeInternal() {
folly::Future<nebula::cpp2::ErrorCode> DataBalanceJobExecutor::executeInternal() {
if (plan_ == nullptr) {
Status status = buildBalancePlan();
if (status != Status::OK()) {
if (status == Status::Balanced()) {
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return status;
return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE;
}
}
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) {
Expand All @@ -35,7 +35,7 @@ folly::Future<Status> DataBalanceJobExecutor::executeInternal() {
executorOnFinished_(status);
});
plan_->invoke();
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Status DataBalanceJobExecutor::buildBalancePlan() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/DataBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DataBalanceJobExecutor : public BalanceJobExecutor {
*
* @return
*/
folly::Future<Status> executeInternal() override;
folly::Future<nebula::cpp2::ErrorCode> executeInternal() override;

/**
* @brief Build a balance plan, which balance data in each zone
Expand Down
17 changes: 7 additions & 10 deletions src/meta/processors/job/LeaderBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,14 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::finish(bool) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::Future<Status> LeaderBalanceJobExecutor::executeInternal() {
folly::Promise<Status> promise;
folly::Future<nebula::cpp2::ErrorCode> LeaderBalanceJobExecutor::executeInternal() {
folly::Promise<nebula::cpp2::ErrorCode> promise;
auto future = promise.getFuture();
// Space ID, Replica Factor and Dependent On Group
std::vector<std::tuple<GraphSpaceID, int32_t, bool>> spaces;
auto ret = getAllSpaces(spaces);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
ret = nebula::cpp2::ErrorCode::E_STORE_FAILURE;
}
return Status::Error("Can't get spaces");
return ret;
}

bool expected = false;
Expand All @@ -246,7 +243,7 @@ folly::Future<Status> LeaderBalanceJobExecutor::executeInternal() {
auto status = adminClient_->getLeaderDist(hostLeaderMap_.get()).get();
if (!status.ok() || hostLeaderMap_->empty()) {
inLeaderBalance_ = false;
return Status::Error("Get leader distribution failed");
return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE;
}

std::vector<folly::SemiFuture<Status>> futures;
Expand Down Expand Up @@ -286,13 +283,13 @@ folly::Future<Status> LeaderBalanceJobExecutor::executeInternal() {

inLeaderBalance_ = false;
if (failed != 0) {
return Status::Error("partiton failed to transfer leader");
LOG(INFO) << folly::stringPrintf("%d partitons failed to transfer leader", failed);
}
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

ErrorOr<nebula::cpp2::ErrorCode, bool> LeaderBalanceJobExecutor::buildLeaderBalancePlan(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/LeaderBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor {
*
* @return
*/
folly::Future<Status> executeInternal() override;
folly::Future<nebula::cpp2::ErrorCode> executeInternal() override;

/**
* @brief Build a plan to balance leader
Expand Down
15 changes: 7 additions & 8 deletions src/meta/processors/job/MetaJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ nebula::cpp2::ErrorCode MetaJobExecutor::prepare() {
// The skeleton to run the job.
// You should rewrite the executeInternal to trigger the calling.
nebula::cpp2::ErrorCode MetaJobExecutor::execute() {
folly::SemiFuture<Status> future = executeInternal();
auto rc = nebula::cpp2::ErrorCode::SUCCEEDED;
folly::SemiFuture<nebula::cpp2::ErrorCode> future = executeInternal();
future.wait();
if (!future.value().ok()) {
LOG(INFO) << future.value().toString();
rc = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE;
if (!future.hasValue()) {
LOG(INFO) << "Exception occur when execute job";
return nebula::cpp2::ErrorCode::E_JOB_NOT_FINISHED;
}
return rc;
return future.value();
}

// Stop the job when the user cancel it.
Expand Down Expand Up @@ -60,8 +59,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::saveSpecialTaskStatus(const cpp2::Repor
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::Future<Status> MetaJobExecutor::executeInternal() {
return Status::OK();
folly::Future<nebula::cpp2::ErrorCode> MetaJobExecutor::executeInternal() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class MetaJobExecutor : public JobExecutor {
nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) override;

protected:
virtual folly::Future<Status> executeInternal();
virtual folly::Future<nebula::cpp2::ErrorCode> executeInternal();

protected:
JobID jobId_{INT_MIN};
Expand Down
8 changes: 4 additions & 4 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::stop() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::Future<Status> ZoneBalanceJobExecutor::executeInternal() {
folly::Future<nebula::cpp2::ErrorCode> ZoneBalanceJobExecutor::executeInternal() {
if (plan_ == nullptr) {
Status status = buildBalancePlan();
if (status != Status::OK()) {
if (status == Status::Balanced()) {
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return status;
return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE;
}
}
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { executorOnFinished_(status); });
plan_->invoke();
return Status::OK();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/ZoneBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
nebula::cpp2::ErrorCode stop() override;

protected:
folly::Future<Status> executeInternal() override;
folly::Future<nebula::cpp2::ErrorCode> executeInternal() override;

/**
* @brief
Expand Down
14 changes: 7 additions & 7 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ TEST(BalanceTest, NormalZoneTest) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
auto ret = balancer.executeInternal();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
balancer.finish();
balancer.lostZones_ = {"5", "6", "7", "8"};
baton.reset();
Expand All @@ -789,7 +789,7 @@ TEST(BalanceTest, NormalZoneTest) {
});
ret = balancer.executeInternal();
baton.wait();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
verifyBalanceTask(
kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 12);
}
Expand All @@ -811,7 +811,7 @@ TEST(BalanceTest, NormalDataTest) {
DataBalanceJobExecutor balancer(jd, kv, &client, {});
balancer.spaceInfo_.loadInfo(space, kv);
auto ret = balancer.executeInternal();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
balancer.finish();
balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}};
folly::Baton<true, std::atomic> baton;
Expand All @@ -821,7 +821,7 @@ TEST(BalanceTest, NormalDataTest) {
});
ret = balancer.executeInternal();
baton.wait();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
verifyBalanceTask(
kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 6);
}
Expand Down Expand Up @@ -856,7 +856,7 @@ TEST(BalanceTest, RecoveryTest) {
});
auto ret = balancer.executeInternal();
baton.wait();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
std::unordered_map<HostAddr, int32_t> partCount;
verifyBalanceTask(kv,
balancer.jobId_,
Expand Down Expand Up @@ -923,7 +923,7 @@ TEST(BalanceTest, StopPlanTest) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
auto ret = balancer.executeInternal();
EXPECT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
auto stopRet = balancer.stop();
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, stopRet);
baton.wait();
Expand Down Expand Up @@ -1241,7 +1241,7 @@ TEST(BalanceTest, LeaderBalanceTest) {
LeaderBalanceJobExecutor balancer(
space, testJobId.fetch_add(1, std::memory_order_relaxed), kv, &client, {});
auto ret = balancer.executeInternal();
ASSERT_EQ(Status::OK(), ret.value());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value());
}

TEST(BalanceTest, LeaderBalanceWithZoneTest) {
Expand Down

0 comments on commit d226aae

Please sign in to comment.