From 0a249e3a794980bc63b17c62734dbf9653ab57b5 Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Thu, 3 Mar 2022 15:06:04 +0800 Subject: [PATCH] show job support errorcode --- src/common/utils/MetaKeyUtils.cpp | 34 ++++++++---- src/common/utils/MetaKeyUtils.h | 24 +++++--- .../executor/admin/SubmitJobExecutor.cpp | 19 +++++-- src/interface/meta.thrift | 4 +- src/meta/processors/job/JobDescription.cpp | 18 ++++-- src/meta/processors/job/JobDescription.h | 12 +++- src/meta/processors/job/JobManager.cpp | 45 ++++++++++++--- .../processors/job/RebuildJobExecutor.cpp | 2 +- .../processors/job/StorageJobExecutor.cpp | 7 ++- src/meta/processors/job/TaskDescription.cpp | 2 + src/meta/processors/job/TaskDescription.h | 12 +++- src/meta/test/BalancerTest.cpp | 8 ++- src/meta/test/GetStatsTest.cpp | 18 ++++-- src/meta/test/JobManagerTest.cpp | 55 +++++++++++++------ tests/tck/job/Job.feature | 6 +- 15 files changed, 196 insertions(+), 70 deletions(-) diff --git a/src/common/utils/MetaKeyUtils.cpp b/src/common/utils/MetaKeyUtils.cpp index 95ed030f2d2..7d9882c9acf 100644 --- a/src/common/utils/MetaKeyUtils.cpp +++ b/src/common/utils/MetaKeyUtils.cpp @@ -1399,7 +1399,8 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type, std::vector paras, meta::cpp2::JobStatus jobStatus, int64_t startTime, - int64_t stopTime) { + int64_t stopTime, + nebula::cpp2::ErrorCode errCode) { std::string val; val.reserve(256); val.append(reinterpret_cast(&type), sizeof(meta::cpp2::JobType)); @@ -1412,11 +1413,17 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type, } val.append(reinterpret_cast(&jobStatus), sizeof(meta::cpp2::JobStatus)) .append(reinterpret_cast(&startTime), sizeof(int64_t)) - .append(reinterpret_cast(&stopTime), sizeof(int64_t)); + .append(reinterpret_cast(&stopTime), sizeof(int64_t)) + .append(reinterpret_cast(&errCode), sizeof(nebula::cpp2::ErrorCode)); return val; } -std::tuple, meta::cpp2::JobStatus, int64_t, int64_t> +std::tuple, + meta::cpp2::JobStatus, + int64_t, + int64_t, + nebula::cpp2::ErrorCode> MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) { CHECK_GE(rawVal.size(), sizeof(meta::cpp2::JobType) + sizeof(size_t) + sizeof(meta::cpp2::JobStatus) + @@ -1439,7 +1446,9 @@ MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) { auto tStart = *reinterpret_cast(rawVal.data() + offset); offset += sizeof(int64_t); auto tStop = *reinterpret_cast(rawVal.data() + offset); - return std::make_tuple(type, paras, status, tStart, tStop); + offset += sizeof(int64_t); + auto errCode = *reinterpret_cast(rawVal.data() + offset); + return std::make_tuple(type, paras, status, tStart, tStop, errCode); } std::pair MetaKeyUtils::parseJobKey(folly::StringPiece key) { @@ -1473,20 +1482,23 @@ std::tuple MetaKeyUtils::parseTaskKey(folly::String std::string MetaKeyUtils::taskVal(HostAddr host, meta::cpp2::JobStatus jobStatus, int64_t startTime, - int64_t stopTime) { + int64_t stopTime, + nebula::cpp2::ErrorCode errCode) { std::string val; val.reserve(128); val.append(MetaKeyUtils::serializeHostAddr(host)) .append(reinterpret_cast(&jobStatus), sizeof(meta::cpp2::JobStatus)) .append(reinterpret_cast(&startTime), sizeof(int64_t)) - .append(reinterpret_cast(&stopTime), sizeof(int64_t)); + .append(reinterpret_cast(&stopTime), sizeof(int64_t)) + .append(reinterpret_cast(&errCode), sizeof(nebula::cpp2::ErrorCode)); return val; } -std::tuple MetaKeyUtils::parseTaskVal( - folly::StringPiece rawVal) { +std::tuple +MetaKeyUtils::parseTaskVal(folly::StringPiece rawVal) { CHECK_GE(rawVal.size(), - sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2); + sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2 + + sizeof(nebula::cpp2::ErrorCode)); size_t offset = 0; HostAddr host = MetaKeyUtils::deserializeHostAddr(rawVal); offset += sizeof(size_t); @@ -1498,7 +1510,9 @@ std::tuple MetaKeyUtils::pars auto tStart = *reinterpret_cast(rawVal.data() + offset); offset += sizeof(int64_t); auto tStop = *reinterpret_cast(rawVal.data() + offset); - return std::make_tuple(host, status, tStart, tStop); + offset += sizeof(int64_t); + auto errCode = *reinterpret_cast(rawVal.data() + offset); + return std::make_tuple(host, status, tStart, tStop, errCode); } } // namespace nebula diff --git a/src/common/utils/MetaKeyUtils.h b/src/common/utils/MetaKeyUtils.h index c1dde6ae5fc..7ad7c2c4530 100644 --- a/src/common/utils/MetaKeyUtils.h +++ b/src/common/utils/MetaKeyUtils.h @@ -442,14 +442,19 @@ class MetaKeyUtils final { std::vector paras, meta::cpp2::JobStatus jobStatus, int64_t startTime, - int64_t stopTime); + int64_t stopTime, + nebula::cpp2::ErrorCode errCode); /** * @brief Decode val from kvstore, return - * {jobType, paras, status, start time, stop time} + * {jobType, paras, status, start time, stop time, error code} */ - static std:: - tuple, meta::cpp2::JobStatus, int64_t, int64_t> - parseJobVal(folly::StringPiece rawVal); + static std::tuple, + meta::cpp2::JobStatus, + int64_t, + int64_t, + nebula::cpp2::ErrorCode> + parseJobVal(folly::StringPiece rawVal); static std::pair parseJobKey(folly::StringPiece key); @@ -464,14 +469,15 @@ class MetaKeyUtils final { static std::string taskVal(HostAddr host, meta::cpp2::JobStatus jobStatus, int64_t startTime, - int64_t stopTime); + int64_t stopTime, + nebula::cpp2::ErrorCode errCode); /** * @brief Decode task val,it should be - * {host, status, start time, stop time} + * {host, status, start time, stop time, error code} */ - static std::tuple parseTaskVal( - folly::StringPiece rawVal); + static std::tuple + parseTaskVal(folly::StringPiece rawVal); }; } // namespace nebula diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index f61ce10e58c..046c1d7096a 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -112,8 +112,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( const nebula::meta::cpp2::JobDesc &jd, const std::vector &td) { if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE || jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) { - nebula::DataSet v( - {"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"}); + nebula::DataSet v({"Job Id(spaceId:partId)", + "Command(src->dst)", + "Status", + "Start Time", + "Stop Time", + "Error Code"}); const auto ¶s = jd.get_paras(); size_t index = std::stoul(paras.back()); uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0, @@ -122,7 +126,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(jd.get_type()), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()).toString(), - convertJobTimestampToDateTime(jd.get_stop_time()).toString()})); + convertJobTimestampToDateTime(jd.get_stop_time()).toString(), + apache::thrift::util::enumNameSafe(jd.get_code())})); for (size_t i = index; i < paras.size() - 1; i++) { meta::cpp2::BalanceTask tsk; apache::thrift::CompactSerializer::deserialize(paras[i], tsk); @@ -144,7 +149,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( std::move(tsk).get_command(), apache::thrift::util::enumNameSafe(tsk.get_result()), convertJobTimestampToDateTime(std::move(tsk).get_start_time()), - convertJobTimestampToDateTime(std::move(tsk).get_stop_time())})); + convertJobTimestampToDateTime(std::move(tsk).get_stop_time()), + apache::thrift::util::enumNameSafe(jd.get_code())})); } v.emplace_back(Row({folly::sformat("Total:{}", total), folly::sformat("Succeeded:{}", succeeded), @@ -153,13 +159,15 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( folly::sformat("Invalid:{}", invalid)})); return v; } else { - nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); + nebula::DataSet v( + {"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time", "Error Code"}); v.emplace_back(nebula::Row({ jd.get_job_id(), apache::thrift::util::enumNameSafe(jd.get_type()), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()), convertJobTimestampToDateTime(jd.get_stop_time()), + apache::thrift::util::enumNameSafe(jd.get_code()), })); // tasks desc for (const auto &taskDesc : td) { @@ -169,6 +177,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(taskDesc.get_status()), convertJobTimestampToDateTime(taskDesc.get_start_time()), convertJobTimestampToDateTime(taskDesc.get_stop_time()), + apache::thrift::util::enumNameSafe(taskDesc.get_code()), })); } return v; diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 8a45688bde4..08f5f6c40c8 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -267,6 +267,7 @@ struct JobDesc { 5: JobStatus status, 6: i64 start_time, 7: i64 stop_time, + 8: common.ErrorCode code, } struct TaskDesc { @@ -277,6 +278,7 @@ struct TaskDesc { 5: JobStatus status, 6: i64 start_time, 7: i64 stop_time, + 8: common.ErrorCode code, } struct AdminJobResult { @@ -517,7 +519,7 @@ struct GetPartsAllocResp { // get workerid for snowflake struct GetWorkerIdReq { - 1: binary host, + 1: binary host, } struct GetWorkerIdResp { diff --git a/src/meta/processors/job/JobDescription.cpp b/src/meta/processors/job/JobDescription.cpp index 4e9d2cedd67..01de0a2de28 100644 --- a/src/meta/processors/job/JobDescription.cpp +++ b/src/meta/processors/job/JobDescription.cpp @@ -26,14 +26,16 @@ JobDescription::JobDescription(GraphSpaceID space, std::vector paras, Status status, int64_t startTime, - int64_t stopTime) + int64_t stopTime, + nebula::cpp2::ErrorCode errCode) : space_(space), jobId_(jobId), type_(type), paras_(std::move(paras)), status_(status), startTime_(startTime), - stopTime_(stopTime) {} + stopTime_(stopTime), + errCode_(errCode) {} ErrorOr JobDescription::makeJobDescription( folly::StringPiece rawkey, folly::StringPiece rawval) { @@ -52,8 +54,15 @@ ErrorOr JobDescription::makeJobDescript auto status = std::get<2>(tup); auto startTime = std::get<3>(tup); auto stopTime = std::get<4>(tup); - return JobDescription( - spaceIdAndJob.first, spaceIdAndJob.second, type, paras, status, startTime, stopTime); + auto errCode = std::get<5>(tup); + return JobDescription(spaceIdAndJob.first, + spaceIdAndJob.second, + type, + paras, + status, + startTime, + stopTime, + errCode); } catch (std::exception& ex) { LOG(INFO) << ex.what(); } @@ -69,6 +78,7 @@ cpp2::JobDesc JobDescription::toJobDesc() { ret.status_ref() = status_; ret.start_time_ref() = startTime_; ret.stop_time_ref() = stopTime_; + ret.code_ref() = errCode_; return ret; } diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 80bccc76ea4..3ae3ffa786c 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -29,7 +29,8 @@ class JobDescription { std::vector paras = {}, Status status = Status::QUEUE, int64_t startTime = 0, - int64_t stopTime = 0); + int64_t stopTime = 0, + nebula::cpp2::ErrorCode errCode = nebula::cpp2::ErrorCode::E_UNKNOWN); /** * @brief Return the JobDescription if both key & val is valid @@ -119,6 +120,14 @@ class JobDescription { return stopTime_; } + void setErrorCode(nebula::cpp2::ErrorCode errCode) { + errCode_ = errCode; + } + + nebula::cpp2::ErrorCode getErrorCode() { + return errCode_; + } + /** * @brief * Get a existed job from kvstore, return folly::none if there isn't @@ -167,6 +176,7 @@ class JobDescription { Status status_; int64_t startTime_; int64_t stopTime_; + nebula::cpp2::ErrorCode errCode_{nebula::cpp2::ErrorCode::E_UNKNOWN}; }; } // namespace meta diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 3515e6db5ad..7aeaac4d4b3 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -92,8 +92,12 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { for (auto& jd : jds) { jd.setStatus(cpp2::JobStatus::QUEUE, true); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); - auto jobVal = MetaKeyUtils::jobVal( - jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); save(jobKey, jobVal); } return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -142,7 +146,8 @@ void JobManager::scheduleThread() { jobDesc.getParas(), jobDesc.getStatus(), jobDesc.getStartTime(), - jobDesc.getStopTime()); + jobDesc.getStopTime(), + jobDesc.getErrorCode()); save(jobKey, jobVal); spaceRunningJobs_.insert_or_assign(spaceId, true); if (!runJobInternal(jobDesc, jobOp)) { @@ -243,13 +248,37 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId, return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE; } + // Set the errorcode of the job + nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED; + if (jobStatus != cpp2::JobStatus::FINISHED) { + // Traverse the tasks and find the first task errorcode unsuccessful + auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId); + std::unique_ptr iter; + auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return rc; + } + for (; iter->valid(); iter->next()) { + if (MetaKeyUtils::isJobKey(iter->key())) { + continue; + } + auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val()); + jobErrCode = std::get<4>(tupTaskVal); + if (jobErrCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + } + optJobDesc.setErrorCode(jobErrCode); + spaceRunningJobs_.insert_or_assign(spaceId, false); auto jobKey = MetaKeyUtils::jobKey(optJobDesc.getSpace(), optJobDesc.getJobId()); auto jobVal = MetaKeyUtils::jobVal(optJobDesc.getJobType(), optJobDesc.getParas(), optJobDesc.getStatus(), optJobDesc.getStartTime(), - optJobDesc.getStopTime()); + optJobDesc.getStopTime(), + optJobDesc.getErrorCode()); auto rc = save(jobKey, jobVal); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return rc; @@ -280,6 +309,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, auto status = code == nebula::cpp2::ErrorCode::SUCCEEDED ? cpp2::JobStatus::FINISHED : cpp2::JobStatus::FAILED; td.setStatus(status); + td.setErrorCode(code); auto spaceId = req.get_space_id(); auto jobId = req.get_job_id(); @@ -300,8 +330,8 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, } auto taskKey = MetaKeyUtils::taskKey(td.getSpace(), td.getJobId(), td.getTaskId()); - auto taskVal = - MetaKeyUtils::taskVal(td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime()); + auto taskVal = MetaKeyUtils::taskVal( + td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime(), td.getErrorCode()); auto rcSave = save(taskKey, taskVal); if (rcSave != nebula::cpp2::ErrorCode::SUCCEEDED) { return rcSave; @@ -392,7 +422,8 @@ nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient* jobDesc.getParas(), jobDesc.getStatus(), jobDesc.getStartTime(), - jobDesc.getStopTime()); + jobDesc.getStopTime(), + jobDesc.getErrorCode()); auto rc = save(jobKey, jobVal); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { enqueue(spaceId, jobId, JbOp::ADD, jobDesc.getJobType()); diff --git a/src/meta/processors/job/RebuildJobExecutor.cpp b/src/meta/processors/job/RebuildJobExecutor.cpp index 2fa380af7b0..0627416b820 100644 --- a/src/meta/processors/job/RebuildJobExecutor.cpp +++ b/src/meta/processors/job/RebuildJobExecutor.cpp @@ -17,7 +17,7 @@ namespace nebula { namespace meta { nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() { - // The last value of paras_ are index name + // The value of paras_ are index name auto spaceRet = spaceExist(); if (spaceRet != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Can't find the space, spaceId " << space_; diff --git a/src/meta/processors/job/StorageJobExecutor.cpp b/src/meta/processors/job/StorageJobExecutor.cpp index 013a555c3f2..adcb0485356 100644 --- a/src/meta/processors/job/StorageJobExecutor.cpp +++ b/src/meta/processors/job/StorageJobExecutor.cpp @@ -157,8 +157,11 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() { for (auto i = 0U; i != addresses.size(); ++i) { TaskDescription task(space_, jobId_, i, addresses[i].first); auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); - auto taskVal = MetaKeyUtils::taskVal( - task.getHost(), task.getStatus(), task.getStartTime(), task.getStopTime()); + auto taskVal = MetaKeyUtils::taskVal(task.getHost(), + task.getStatus(), + task.getStartTime(), + task.getStopTime(), + task.getErrorCode()); data.emplace_back(std::move(taskKey), std::move(taskVal)); } diff --git a/src/meta/processors/job/TaskDescription.cpp b/src/meta/processors/job/TaskDescription.cpp index c7600bb4e31..22e177ce473 100644 --- a/src/meta/processors/job/TaskDescription.cpp +++ b/src/meta/processors/job/TaskDescription.cpp @@ -53,6 +53,7 @@ TaskDescription::TaskDescription(const folly::StringPiece& key, const folly::Str status_ = std::get<1>(tupVal); startTime_ = std::get<2>(tupVal); stopTime_ = std::get<3>(tupVal); + errCode_ = std::get<4>(tupVal); } /* @@ -72,6 +73,7 @@ cpp2::TaskDesc TaskDescription::toTaskDesc() { ret.status_ref() = status_; ret.start_time_ref() = startTime_; ret.stop_time_ref() = stopTime_; + ret.code_ref() = errCode_; return ret; } diff --git a/src/meta/processors/job/TaskDescription.h b/src/meta/processors/job/TaskDescription.h index 9e2bd555d8a..6b5f016c170 100644 --- a/src/meta/processors/job/TaskDescription.h +++ b/src/meta/processors/job/TaskDescription.h @@ -62,8 +62,7 @@ class TaskDescription { cpp2::TaskDesc toTaskDesc(); /** - * @brief - * Set the internal status + * @brief Set the internal status * Will check if newStatus is later than curr Status * e.g. set running to a finished job is forbidden * @@ -79,6 +78,14 @@ class TaskDescription { return status_; } + void setErrorCode(nebula::cpp2::ErrorCode errCode) { + errCode_ = errCode; + } + + nebula::cpp2::ErrorCode getErrorCode() { + return errCode_; + } + GraphSpaceID getSpace() { return space_; } @@ -111,6 +118,7 @@ class TaskDescription { cpp2::JobStatus status_; int64_t startTime_; int64_t stopTime_; + nebula::cpp2::ErrorCode errCode_{nebula::cpp2::ErrorCode::E_UNKNOWN}; }; } // namespace meta diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index d84ef1fbd9f..362e1f50db3 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -742,8 +742,12 @@ JobDescription makeJobDescription(GraphSpaceID space, kvstore::KVStore* kv, cpp2 JobDescription jd(space, testJobId.fetch_add(1, std::memory_order_relaxed), jobType, {}); std::vector data; auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); - auto jobVal = MetaKeyUtils::jobVal( - jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); data.emplace_back(jobKey, jobVal); folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { diff --git a/src/meta/test/GetStatsTest.cpp b/src/meta/test/GetStatsTest.cpp index c8978be803f..0d7f2abcdd0 100644 --- a/src/meta/test/GetStatsTest.cpp +++ b/src/meta/test/GetStatsTest.cpp @@ -135,7 +135,8 @@ TEST_F(GetStatsTest, StatsJob) { statsJob.getParas(), statsJob.getStatus(), statsJob.getStartTime(), - statsJob.getStopTime()); + statsJob.getStopTime(), + statsJob.getErrorCode()); auto rc = jobMgr->save(std::move(jobKey1), std::move(jobVal1)); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); @@ -170,7 +171,8 @@ TEST_F(GetStatsTest, StatsJob) { job11.getParas(), job11.getStatus(), job11.getStartTime(), - job11.getStopTime()); + job11.getStopTime(), + job11.getErrorCode()); auto retsav = jobMgr->save(std::move(jobKey2), std::move(jobVal2)); ASSERT_EQ(retsav, nebula::cpp2::ErrorCode::SUCCEEDED); } @@ -189,7 +191,8 @@ TEST_F(GetStatsTest, StatsJob) { statsJob.getParas(), statsJob.getStatus(), statsJob.getStartTime(), - statsJob.getStopTime()); + statsJob.getStopTime(), + statsJob.getErrorCode()); jobMgr->save(std::move(jobKey3), std::move(jobVal3)); auto statsKey = MetaKeyUtils::statsKey(spaceId); @@ -245,7 +248,8 @@ TEST_F(GetStatsTest, StatsJob) { statsJob2.getParas(), statsJob2.getStatus(), statsJob2.getStartTime(), - statsJob2.getStopTime()); + statsJob2.getStopTime(), + statsJob2.getErrorCode()); auto rc2 = jobMgr->save(std::move(jobKey4), std::move(jobVal4)); ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED); { @@ -295,7 +299,8 @@ TEST_F(GetStatsTest, StatsJob) { job21.getParas(), job21.getStatus(), job21.getStartTime(), - job21.getStopTime()); + job21.getStopTime(), + job21.getErrorCode()); auto retsav = jobMgr->save(std::move(jobKey5), std::move(jobVal5)); ASSERT_EQ(retsav, nebula::cpp2::ErrorCode::SUCCEEDED); } @@ -350,7 +355,8 @@ TEST_F(GetStatsTest, StatsJob) { statsJob2.getParas(), statsJob2.getStatus(), statsJob2.getStartTime(), - statsJob2.getStopTime()); + statsJob2.getStopTime(), + statsJob2.getErrorCode()); jobMgr->save(std::move(jobKey6), std::move(jobVal6)); { diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index ca40cba48b8..b9cc94d1bf3 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -132,7 +132,8 @@ TEST_F(JobManagerTest, StatsJob) { jobDesc.getParas(), jobDesc.getStatus(), jobDesc.getStartTime(), - jobDesc.getStopTime()); + jobDesc.getStopTime(), + jobDesc.getErrorCode()); jobMgr->save(std::move(jobKey), std::move(jobVal)); auto job1Ret = @@ -374,9 +375,10 @@ TEST_F(JobManagerTest, ShowJob) { TaskDescription td1(spaceId, jobId2, task1, host1); td1.setStatus(cpp2::JobStatus::RUNNING); td1.setStatus(cpp2::JobStatus::FINISHED); + td1.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); auto taskKey1 = MetaKeyUtils::taskKey(td1.getSpace(), td1.getJobId(), td1.getTaskId()); - auto taskVal1 = - MetaKeyUtils::taskVal(td1.getHost(), td1.getStatus(), td1.getStartTime(), td1.getStopTime()); + auto taskVal1 = MetaKeyUtils::taskVal( + td1.getHost(), td1.getStatus(), td1.getStartTime(), td1.getStopTime(), td1.getErrorCode()); jobMgr->save(taskKey1, taskVal1); int32_t task2 = 1; @@ -385,8 +387,8 @@ TEST_F(JobManagerTest, ShowJob) { td2.setStatus(cpp2::JobStatus::RUNNING); td2.setStatus(cpp2::JobStatus::FAILED); auto taskKey2 = MetaKeyUtils::taskKey(td2.getSpace(), td2.getJobId(), td2.getTaskId()); - auto taskVal2 = - MetaKeyUtils::taskVal(td2.getHost(), td2.getStatus(), td2.getStartTime(), td2.getStopTime()); + auto taskVal2 = MetaKeyUtils::taskVal( + td2.getHost(), td2.getStatus(), td2.getStartTime(), td2.getStopTime(), td2.getErrorCode()); jobMgr->save(taskKey2, taskVal2); LOG(INFO) << "before jobMgr->showJob"; @@ -411,6 +413,7 @@ TEST_F(JobManagerTest, ShowJob) { ASSERT_EQ(tasks[0].get_status(), cpp2::JobStatus::FINISHED); ASSERT_EQ(tasks[0].get_start_time(), td1.getStartTime()); ASSERT_EQ(tasks[0].get_stop_time(), td1.getStopTime()); + ASSERT_EQ(tasks[0].get_code(), td1.getErrorCode()); ASSERT_EQ(tasks[1].get_space_id(), spaceId); ASSERT_EQ(tasks[1].get_job_id(), jobId1); @@ -419,6 +422,7 @@ TEST_F(JobManagerTest, ShowJob) { ASSERT_EQ(tasks[1].get_status(), cpp2::JobStatus::FAILED); ASSERT_EQ(tasks[1].get_start_time(), td2.getStartTime()); ASSERT_EQ(tasks[1].get_stop_time(), td2.getStopTime()); + ASSERT_EQ(tasks[1].get_code(), td2.getErrorCode()); } TEST_F(JobManagerTest, ShowJobInOtherSpace) { @@ -437,9 +441,10 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { TaskDescription td1(spaceId1, jobId2, task1, host1); td1.setStatus(cpp2::JobStatus::RUNNING); td1.setStatus(cpp2::JobStatus::FINISHED); + td1.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); auto taskKey1 = MetaKeyUtils::taskKey(td1.getSpace(), td1.getJobId(), td1.getTaskId()); - auto taskVal1 = - MetaKeyUtils::taskVal(td1.getHost(), td1.getStatus(), td1.getStartTime(), td1.getStopTime()); + auto taskVal1 = MetaKeyUtils::taskVal( + td1.getHost(), td1.getStatus(), td1.getStartTime(), td1.getStopTime(), td1.getErrorCode()); jobMgr->save(taskKey1, taskVal1); int32_t task2 = 1; @@ -448,8 +453,8 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { td2.setStatus(cpp2::JobStatus::RUNNING); td2.setStatus(cpp2::JobStatus::FAILED); auto taskKey2 = MetaKeyUtils::taskKey(td2.getSpace(), td2.getJobId(), td2.getTaskId()); - auto taskVal2 = - MetaKeyUtils::taskVal(td2.getHost(), td2.getStatus(), td2.getStartTime(), td2.getStopTime()); + auto taskVal2 = MetaKeyUtils::taskVal( + td2.getHost(), td2.getStatus(), td2.getStartTime(), td2.getStopTime(), td2.getErrorCode()); jobMgr->save(taskKey2, taskVal2); LOG(INFO) << "before jobMgr->showJob"; @@ -470,8 +475,12 @@ TEST_F(JobManagerTest, RecoverJob) { for (auto jobId = 0; jobId < nJob; ++jobId) { JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); - auto jobVal = MetaKeyUtils::jobVal( - jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } @@ -497,8 +506,12 @@ TEST(JobDescriptionTest, Ctor2) { LOG(INFO) << "jd1 ctored"; auto jobKey = MetaKeyUtils::jobKey(jd1.getSpace(), jd1.getJobId()); - auto jobVal = MetaKeyUtils::jobVal( - jd1.getJobType(), jd1.getParas(), jd1.getStatus(), jd1.getStartTime(), jd1.getStopTime()); + auto jobVal = MetaKeyUtils::jobVal(jd1.getJobType(), + jd1.getParas(), + jd1.getStatus(), + jd1.getStartTime(), + jd1.getStopTime(), + jd1.getErrorCode()); auto optJobRet = JobDescription::makeJobDescription(jobKey, jobVal); ASSERT_TRUE(nebula::ok(optJobRet)); } @@ -523,11 +536,16 @@ TEST(JobDescriptionTest, ParseVal) { auto status = cpp2::JobStatus::FINISHED; jd.setStatus(cpp2::JobStatus::RUNNING); jd.setStatus(status); + jd.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); auto startTime = jd.getStartTime(); auto stopTime = jd.getStopTime(); - auto jobVal = MetaKeyUtils::jobVal( - jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); auto parsedVal = MetaKeyUtils::parseJobVal(jobVal); ASSERT_EQ(cpp2::JobType::FLUSH, std::get<0>(parsedVal)); auto paras = std::get<1>(parsedVal); @@ -535,6 +553,7 @@ TEST(JobDescriptionTest, ParseVal) { ASSERT_EQ(status, std::get<2>(parsedVal)); ASSERT_EQ(startTime, std::get<3>(parsedVal)); ASSERT_EQ(stopTime, std::get<4>(parsedVal)); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, std::get<5>(parsedVal)); } TEST(TaskDescriptionTest, Ctor) { @@ -578,15 +597,17 @@ TEST(TaskDescriptionTest, ParseVal) { td.setStatus(cpp2::JobStatus::RUNNING); auto status = cpp2::JobStatus::FINISHED; td.setStatus(status); + td.setErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); - auto strVal = - MetaKeyUtils::taskVal(td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime()); + auto strVal = MetaKeyUtils::taskVal( + td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime(), td.getErrorCode()); auto parsedVal = MetaKeyUtils::parseTaskVal(strVal); ASSERT_EQ(td.getHost(), std::get<0>(parsedVal)); ASSERT_EQ(td.getStatus(), std::get<1>(parsedVal)); ASSERT_EQ(td.getStartTime(), std::get<2>(parsedVal)); ASSERT_EQ(td.getStopTime(), std::get<3>(parsedVal)); + ASSERT_EQ(td.getErrorCode(), std::get<4>(parsedVal)); } } // namespace meta diff --git a/tests/tck/job/Job.feature b/tests/tck/job/Job.feature index 20c4d3bfa20..c3043713395 100644 --- a/tests/tck/job/Job.feature +++ b/tests/tck/job/Job.feature @@ -110,9 +110,9 @@ Feature: Submit job space requirements SHOW JOB {}; """ Then the result should be, in order: - | Job Id(TaskId) | Command(Dest) | Status | Start Time | Stop Time | - | /\d+/ | "STATS" | "FINISHED" | /\w+/ | /\w+/ | - | /\d+/ | /\w+/ | "FINISHED" | /\w+/ | /\w+/ | + | Job Id(TaskId) | Command(Dest) | Status | Start Time | Stop Time | Error Code | + | /\d+/ | "STATS" | "FINISHED" | /\w+/ | /\w+/ | "SUCCEEDED" | + | /\d+/ | /\w+/ | "FINISHED" | /\w+/ | /\w+/ | "SUCCEEDED" | When executing query, fill replace holders with element index of 0 in job_id: """ STOP JOB {};