From b20964aa73358374eb95704fb627f64d58dc6bd3 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 22 Jul 2022 09:47:24 +0800 Subject: [PATCH] Job manager related fix (#4446) * make error code friendly when stop a failed job * make meta job use the errorcode * fix balance data deplay mismatch in show jobs and show job id * fix tck Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/clients/meta/MetaClient.cpp | 3 ++ .../executor/admin/SubmitJobExecutor.cpp | 28 ++++++++++++++++++- src/graph/executor/test/JobTest.cpp | 3 +- src/interface/common.thrift | 1 + .../processors/job/DataBalanceJobExecutor.cpp | 8 +++--- .../processors/job/DataBalanceJobExecutor.h | 2 +- src/meta/processors/job/JobManager.cpp | 2 +- .../job/LeaderBalanceJobExecutor.cpp | 17 +++++------ .../processors/job/LeaderBalanceJobExecutor.h | 2 +- src/meta/processors/job/MetaJobExecutor.cpp | 15 +++++----- src/meta/processors/job/MetaJobExecutor.h | 2 +- .../processors/job/ZoneBalanceJobExecutor.cpp | 8 +++--- .../processors/job/ZoneBalanceJobExecutor.h | 2 +- src/meta/test/BalancerTest.cpp | 14 +++++----- tests/tck/job/Job.feature | 3 +- 15 files changed, 69 insertions(+), 41 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 28eaf9b4180..55f45eaa0ac 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -918,6 +918,9 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("Stop job failure!"); case nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE: return Status::Error("Save job failure!"); + case nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE: + return Status::Error( + "Finished job or failed job can not be stopped, please start another job instead"); case nebula::cpp2::ErrorCode::E_BALANCER_FAILURE: return Status::Error("Balance failure!"); case nebula::cpp2::ErrorCode::E_NO_INVALID_BALANCE_PLAN: diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index 524c5d8c3f1..fbec9898d69 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -109,6 +109,10 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( const nebula::meta::cpp2::JobDesc &jd, const std::vector &td) { if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE || jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) { + // The job which executed on meta, aka balance data, is a litte different from others. In order + // to be consistent with other jobs, the task result is set in paras in JobDesc serialized by + // thrift. The reason that we can't use the list of TaskDesc is that the state of balance task + // is saved in BalanceTask, which is different from TaskDesc. nebula::DataSet v({"Job Id(spaceId:partId)", "Command(src->dst)", "Status", @@ -120,7 +124,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0, invalid = 0; v.emplace_back(Row({jd.get_job_id(), - apache::thrift::util::enumNameSafe(meta::cpp2::JobType::DATA_BALANCE), + apache::thrift::util::enumNameSafe(meta::cpp2::JobType::ZONE_BALANCE), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()).toString(), convertJobTimestampToDateTime(jd.get_stop_time()).toString(), @@ -167,8 +171,24 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( convertJobTimestampToDateTime(jd.get_stop_time()), apache::thrift::util::enumNameSafe(jd.get_code()), })); + // As for normal type of job, there will only be one of the three state: running, succeeded or + // failed. + uint32_t total = td.size(), succeeded = 0, failed = 0, inProgress = 0; // tasks desc for (const auto &taskDesc : td) { + switch (taskDesc.get_status()) { + case meta::cpp2::JobStatus::RUNNING: + ++inProgress; + break; + case meta::cpp2::JobStatus::FAILED: + ++failed; + break; + case meta::cpp2::JobStatus::FINISHED: + ++succeeded; + break; + default: + break; + } v.emplace_back(nebula::Row({ taskDesc.get_task_id(), taskDesc.get_host().host, @@ -178,6 +198,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(taskDesc.get_code()), })); } + v.emplace_back(Row({folly::sformat("Total:{}", total), + folly::sformat("Succeeded:{}", succeeded), + folly::sformat("Failed:{}", failed), + folly::sformat("In Progress:{}", inProgress), + (""), + ("")})); return v; } } diff --git a/src/graph/executor/test/JobTest.cpp b/src/graph/executor/test/JobTest.cpp index 46e83455bd5..c4e6bd46e52 100644 --- a/src/graph/executor/test/JobTest.cpp +++ b/src/graph/executor/test/JobTest.cpp @@ -36,7 +36,8 @@ TEST_F(JobTest, JobFinishTime) { auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp)); EXPECT_TRUE(status.ok()); auto result = std::move(status).value(); - EXPECT_EQ(result.rows.size(), 2); + // One line for job, one line for the task, one line for the count + EXPECT_EQ(result.rows.size(), 3); EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123))); EXPECT_EQ(result.rows[0][4], Value::kEmpty); EXPECT_EQ(result.rows[1][3], Value(time::TimeConversion::unixSecondsToDateTime(456))); diff --git a/src/interface/common.thrift b/src/interface/common.thrift index c13929d4db3..1e95be659b8 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -389,6 +389,7 @@ enum ErrorCode { E_TASK_REPORT_OUT_DATE = -2049, // Task report failed E_JOB_NOT_IN_SPACE = -2050, // The current task is not in the graph space E_JOB_NEED_RECOVER = -2051, // The current task needs to be resumed + E_JOB_NOT_STOPPABLE = -2052, // Failed or finished job could not be stopped E_INVALID_JOB = -2065, // Invalid task // Backup Failure diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index 46886d87cc3..d1a6286dba7 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -15,15 +15,15 @@ namespace nebula { namespace meta { -folly::Future DataBalanceJobExecutor::executeInternal() { +folly::Future DataBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { if (status == Status::Balanced()) { executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } - return status; + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { @@ -35,7 +35,7 @@ folly::Future DataBalanceJobExecutor::executeInternal() { executorOnFinished_(status); }); plan_->invoke(); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } Status DataBalanceJobExecutor::buildBalancePlan() { diff --git a/src/meta/processors/job/DataBalanceJobExecutor.h b/src/meta/processors/job/DataBalanceJobExecutor.h index c2dbad2ed6e..0bc1810a162 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.h +++ b/src/meta/processors/job/DataBalanceJobExecutor.h @@ -49,7 +49,7 @@ class DataBalanceJobExecutor : public BalanceJobExecutor { * * @return */ - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief Build a balance plan, which balance data in each zone diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 99928e99c42..df2f8956212 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -259,7 +259,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished( if (!optJobDesc.setStatus(jobStatus)) { // job already been set as finished, failed or stopped - return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE; + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } // If the job is marked as FAILED, one of the following will be triggered diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp index 99e25f6a63e..386b4fcfab8 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp @@ -227,17 +227,14 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::finish(bool) { return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future LeaderBalanceJobExecutor::executeInternal() { - folly::Promise promise; +folly::Future LeaderBalanceJobExecutor::executeInternal() { + folly::Promise promise; auto future = promise.getFuture(); // Space ID, Replica Factor and Dependent On Group std::vector> spaces; auto ret = getAllSpaces(spaces); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - ret = nebula::cpp2::ErrorCode::E_STORE_FAILURE; - } - return Status::Error("Can't get spaces"); + return ret; } bool expected = false; @@ -246,7 +243,7 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { auto status = adminClient_->getLeaderDist(hostLeaderMap_.get()).get(); if (!status.ok() || hostLeaderMap_->empty()) { inLeaderBalance_ = false; - return Status::Error("Get leader distribution failed"); + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } std::vector> futures; @@ -286,13 +283,13 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { inLeaderBalance_ = false; if (failed != 0) { - return Status::Error("partiton failed to transfer leader"); + LOG(INFO) << folly::stringPrintf("%d partitons failed to transfer leader", failed); } executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } ErrorOr LeaderBalanceJobExecutor::buildLeaderBalancePlan( diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.h b/src/meta/processors/job/LeaderBalanceJobExecutor.h index 3bd42b2ee65..5d87fb551c2 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.h +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.h @@ -42,7 +42,7 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor { * * @return */ - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief Build a plan to balance leader diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index e4b02f81f47..fd355f13a74 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -24,14 +24,13 @@ nebula::cpp2::ErrorCode MetaJobExecutor::prepare() { // The skeleton to run the job. // You should rewrite the executeInternal to trigger the calling. nebula::cpp2::ErrorCode MetaJobExecutor::execute() { - folly::SemiFuture future = executeInternal(); - auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; + folly::SemiFuture future = executeInternal(); future.wait(); - if (!future.value().ok()) { - LOG(INFO) << future.value().toString(); - rc = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE; + if (!future.hasValue()) { + LOG(INFO) << "Exception occur when execute job"; + return nebula::cpp2::ErrorCode::E_JOB_NOT_FINISHED; } - return rc; + return future.value(); } // Stop the job when the user cancel it. @@ -60,8 +59,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::saveSpecialTaskStatus(const cpp2::Repor return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future MetaJobExecutor::executeInternal() { - return Status::OK(); +folly::Future MetaJobExecutor::executeInternal() { + return nebula::cpp2::ErrorCode::SUCCEEDED; } } // namespace meta diff --git a/src/meta/processors/job/MetaJobExecutor.h b/src/meta/processors/job/MetaJobExecutor.h index 10e1bcc56c0..22a318fc341 100644 --- a/src/meta/processors/job/MetaJobExecutor.h +++ b/src/meta/processors/job/MetaJobExecutor.h @@ -71,7 +71,7 @@ class MetaJobExecutor : public JobExecutor { nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) override; protected: - virtual folly::Future executeInternal(); + virtual folly::Future executeInternal(); protected: JobID jobId_{INT_MIN}; diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index 9f44e510b2d..f0eb55ee2d8 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -45,20 +45,20 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::stop() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -folly::Future ZoneBalanceJobExecutor::executeInternal() { +folly::Future ZoneBalanceJobExecutor::executeInternal() { if (plan_ == nullptr) { Status status = buildBalancePlan(); if (status != Status::OK()) { if (status == Status::Balanced()) { executorOnFinished_(meta::cpp2::JobStatus::FINISHED); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } - return status; + return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { executorOnFinished_(status); }); plan_->invoke(); - return Status::OK(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() { diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index 20658ea98d7..dbe8050f426 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -33,7 +33,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { nebula::cpp2::ErrorCode stop() override; protected: - folly::Future executeInternal() override; + folly::Future executeInternal() override; /** * @brief diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 07b4c08b2e5..006a71e5c22 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -779,7 +779,7 @@ TEST(BalanceTest, NormalZoneTest) { return nebula::cpp2::ErrorCode::SUCCEEDED; }); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); balancer.finish(); balancer.lostZones_ = {"5", "6", "7", "8"}; baton.reset(); @@ -789,7 +789,7 @@ TEST(BalanceTest, NormalZoneTest) { }); ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); verifyBalanceTask( kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 12); } @@ -811,7 +811,7 @@ TEST(BalanceTest, NormalDataTest) { DataBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(space, kv); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); balancer.finish(); balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}}; folly::Baton baton; @@ -821,7 +821,7 @@ TEST(BalanceTest, NormalDataTest) { }); ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); verifyBalanceTask( kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 6); } @@ -856,7 +856,7 @@ TEST(BalanceTest, RecoveryTest) { }); auto ret = balancer.executeInternal(); baton.wait(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); std::unordered_map partCount; verifyBalanceTask(kv, balancer.jobId_, @@ -923,7 +923,7 @@ TEST(BalanceTest, StopPlanTest) { return nebula::cpp2::ErrorCode::SUCCEEDED; }); auto ret = balancer.executeInternal(); - EXPECT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); auto stopRet = balancer.stop(); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, stopRet); baton.wait(); @@ -1241,7 +1241,7 @@ TEST(BalanceTest, LeaderBalanceTest) { LeaderBalanceJobExecutor balancer( space, testJobId.fetch_add(1, std::memory_order_relaxed), kv, &client, {}); auto ret = balancer.executeInternal(); - ASSERT_EQ(Status::OK(), ret.value()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret.value()); } TEST(BalanceTest, LeaderBalanceWithZoneTest) { diff --git a/tests/tck/job/Job.feature b/tests/tck/job/Job.feature index c3043713395..869ccc566f3 100644 --- a/tests/tck/job/Job.feature +++ b/tests/tck/job/Job.feature @@ -113,11 +113,12 @@ Feature: Submit job space requirements | Job Id(TaskId) | Command(Dest) | Status | Start Time | Stop Time | Error Code | | /\d+/ | "STATS" | "FINISHED" | /\w+/ | /\w+/ | "SUCCEEDED" | | /\d+/ | /\w+/ | "FINISHED" | /\w+/ | /\w+/ | "SUCCEEDED" | + | /\w+/ | /\w+/ | /\w+/ | /\w+/ | "" | "" | When executing query, fill replace holders with element index of 0 in job_id: """ STOP JOB {}; """ - Then an ExecutionError should be raised at runtime: Save job failure! + Then an ExecutionError should be raised at runtime: Finished job or failed job can not be stopped, please start another job instead Scenario: Submit and show jobs in other space Given create a space with following options: