diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index 524c5d8c3f1..04d9dd23e4a 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -109,6 +109,10 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( const nebula::meta::cpp2::JobDesc &jd, const std::vector &td) { if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE || jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) { + // The job which executed on meta, aka balance data, is a litte different from others. In order + // to be consistent with other jobs, the task result is set in paras in JobDesc serialized by + // thrift. The reason that we can't use the list of TaskDesc is that the state of balance task + // is saved in BalanceTask, which is different from TaskDesc. nebula::DataSet v({"Job Id(spaceId:partId)", "Command(src->dst)", "Status", @@ -167,8 +171,24 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( convertJobTimestampToDateTime(jd.get_stop_time()), apache::thrift::util::enumNameSafe(jd.get_code()), })); + // As for normal type of job, there will only be one of the three state: running, succeeded or + // failed. + uint32_t total = td.size(), succeeded = 0, failed = 0, inProgress = 0; // tasks desc for (const auto &taskDesc : td) { + switch (taskDesc.get_status()) { + case meta::cpp2::JobStatus::RUNNING: + ++inProgress; + break; + case meta::cpp2::JobStatus::FAILED: + ++failed; + break; + case meta::cpp2::JobStatus::FINISHED: + ++succeeded; + break; + default: + break; + } v.emplace_back(nebula::Row({ taskDesc.get_task_id(), taskDesc.get_host().host, @@ -178,6 +198,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(taskDesc.get_code()), })); } + v.emplace_back(Row({folly::sformat("Total:{}", total), + folly::sformat("Succeeded:{}", succeeded), + folly::sformat("Failed:{}", failed), + folly::sformat("In Progress:{}", inProgress), + (""), + ("")})); return v; } } diff --git a/src/graph/executor/test/JobTest.cpp b/src/graph/executor/test/JobTest.cpp index 46e83455bd5..c4e6bd46e52 100644 --- a/src/graph/executor/test/JobTest.cpp +++ b/src/graph/executor/test/JobTest.cpp @@ -36,7 +36,8 @@ TEST_F(JobTest, JobFinishTime) { auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp)); EXPECT_TRUE(status.ok()); auto result = std::move(status).value(); - EXPECT_EQ(result.rows.size(), 2); + // One line for job, one line for the task, one line for the count + EXPECT_EQ(result.rows.size(), 3); EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123))); EXPECT_EQ(result.rows[0][4], Value::kEmpty); EXPECT_EQ(result.rows[1][3], Value(time::TimeConversion::unixSecondsToDateTime(456))); diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index 46886d87cc3..d1a6286dba7 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -15,15 +15,15 @@ namespace nebula { namespace meta { -folly::Future DataBalanceJobExecutor::executeInternal() { +folly::Future DataBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { if (status == Status::Balanced()) { executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } - return status; + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { @@ -35,7 +35,7 @@ folly::Future DataBalanceJobExecutor::executeInternal() { executorOnFinished_(status); }); plan_->invoke(); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } Status DataBalanceJobExecutor::buildBalancePlan() { diff --git a/src/meta/processors/job/DataBalanceJobExecutor.h b/src/meta/processors/job/DataBalanceJobExecutor.h index c2dbad2ed6e..0bc1810a162 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.h +++ b/src/meta/processors/job/DataBalanceJobExecutor.h @@ -49,7 +49,7 @@ class DataBalanceJobExecutor : public BalanceJobExecutor { * * @return */ - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief Build a balance plan, which balance data in each zone diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp index 99e25f6a63e..386b4fcfab8 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp @@ -227,17 +227,14 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::finish(bool) { return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future LeaderBalanceJobExecutor::executeInternal() { - folly::Promise promise; +folly::Future LeaderBalanceJobExecutor::executeInternal() { + folly::Promise promise; auto future = promise.getFuture(); // Space ID, Replica Factor and Dependent On Group std::vector> spaces; auto ret = getAllSpaces(spaces); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - ret = nebula::cpp2::ErrorCode::E_STORE_FAILURE; - } - return Status::Error("Can't get spaces"); + return ret; } bool expected = false; @@ -246,7 +243,7 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { auto status = adminClient_->getLeaderDist(hostLeaderMap_.get()).get(); if (!status.ok() || hostLeaderMap_->empty()) { inLeaderBalance_ = false; - return Status::Error("Get leader distribution failed"); + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } std::vector> futures; @@ -286,13 +283,13 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { inLeaderBalance_ = false; if (failed != 0) { - return Status::Error("partiton failed to transfer leader"); + LOG(INFO) << folly::stringPrintf("%d partitons failed to transfer leader", failed); } executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } ErrorOr LeaderBalanceJobExecutor::buildLeaderBalancePlan( diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.h b/src/meta/processors/job/LeaderBalanceJobExecutor.h index 3bd42b2ee65..5d87fb551c2 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.h +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.h @@ -42,7 +42,7 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor { * * @return */ - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief Build a plan to balance leader diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index e4b02f81f47..fd355f13a74 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -24,14 +24,13 @@ nebula::cpp2::ErrorCode MetaJobExecutor::prepare() { // The skeleton to run the job. // You should rewrite the executeInternal to trigger the calling. nebula::cpp2::ErrorCode MetaJobExecutor::execute() { - folly::SemiFuture future = executeInternal(); - auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; + folly::SemiFuture future = executeInternal(); future.wait(); - if (!future.value().ok()) { - LOG(INFO) << future.value().toString(); - rc = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE; + if (!future.hasValue()) { + LOG(INFO) << "Exception occur when execute job"; + return nebula::cpp2::ErrorCode::E_JOB_NOT_FINISHED; } - return rc; + return future.value(); } // Stop the job when the user cancel it. @@ -60,8 +59,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::saveSpecialTaskStatus(const cpp2::Repor return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future MetaJobExecutor::executeInternal() { - return Status::OK(); +folly::Future MetaJobExecutor::executeInternal() { + return nebula::cpp2::ErrorCode::SUCCEEDED; } } // namespace meta diff --git a/src/meta/processors/job/MetaJobExecutor.h b/src/meta/processors/job/MetaJobExecutor.h index 10e1bcc56c0..22a318fc341 100644 --- a/src/meta/processors/job/MetaJobExecutor.h +++ b/src/meta/processors/job/MetaJobExecutor.h @@ -71,7 +71,7 @@ class MetaJobExecutor : public JobExecutor { nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) override; protected: - virtual folly::Future executeInternal(); + virtual folly::Future executeInternal(); protected: JobID jobId_{INT_MIN}; diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index 9f44e510b2d..f0eb55ee2d8 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -45,20 +45,20 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::stop() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future ZoneBalanceJobExecutor::executeInternal() { +folly::Future ZoneBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { if (status == Status::Balanced()) { executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } - return status; + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { executorOnFinished_(status); }); plan_->invoke(); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() { diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index 20658ea98d7..dbe8050f426 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -33,7 +33,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { nebula::cpp2::ErrorCode stop() override; protected: - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 07b4c08b2e5..006a71e5c22 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -779,7 +779,7 @@ TEST(BalanceTest, NormalZoneTest) { return nebula::cpp2::ErrorCode::SUCCEEDED; }); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); balancer.finish(); balancer.lostZones_ = {"5", "6", "7", "8"}; baton.reset(); @@ -789,7 +789,7 @@ TEST(BalanceTest, NormalZoneTest) { }); ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); verifyBalanceTask( kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 12); } @@ -811,7 +811,7 @@ TEST(BalanceTest, NormalDataTest) { DataBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(space, kv); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); balancer.finish(); balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}}; folly::Baton baton; @@ -821,7 +821,7 @@ TEST(BalanceTest, NormalDataTest) { }); ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); verifyBalanceTask( kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 6); } @@ -856,7 +856,7 @@ TEST(BalanceTest, RecoveryTest) { }); auto ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); std::unordered_map partCount; verifyBalanceTask(kv, balancer.jobId_, @@ -923,7 +923,7 @@ TEST(BalanceTest, StopPlanTest) { return nebula::cpp2::ErrorCode::SUCCEEDED; }); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); auto stopRet = balancer.stop(); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, stopRet); baton.wait(); @@ -1241,7 +1241,7 @@ TEST(BalanceTest, LeaderBalanceTest) { LeaderBalanceJobExecutor balancer( space, testJobId.fetch_add(1, std::memory_order_relaxed), kv, &client, {}); auto ret = balancer.executeInternal(); - ASSERT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); } TEST(BalanceTest, LeaderBalanceWithZoneTest) {