Skip to content

Commit

Permalink
show job support errorcode
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Mar 22, 2022
1 parent de57a13 commit b68d059
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 66 deletions.
34 changes: 24 additions & 10 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,8 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type,
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
int64_t stopTime,
nebula::cpp2::ErrorCode errCode) {
std::string val;
val.reserve(256);
val.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::JobType));
Expand All @@ -1412,11 +1413,17 @@ std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type,
}
val.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&errCode), sizeof(nebula::cpp2::ErrorCode));
return val;
}

std::tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
std::tuple<meta::cpp2::JobType,
std::vector<std::string>,
meta::cpp2::JobStatus,
int64_t,
int64_t,
nebula::cpp2::ErrorCode>
MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(meta::cpp2::JobType) + sizeof(size_t) + sizeof(meta::cpp2::JobStatus) +
Expand All @@ -1439,7 +1446,9 @@ MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) {
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(type, paras, status, tStart, tStop);
offset += sizeof(int64_t);
auto errCode = *reinterpret_cast<const nebula::cpp2::ErrorCode*>(rawVal.data() + offset);
return std::make_tuple(type, paras, status, tStart, tStop, errCode);
}

std::pair<GraphSpaceID, JobID> MetaKeyUtils::parseJobKey(folly::StringPiece key) {
Expand Down Expand Up @@ -1473,20 +1482,23 @@ std::tuple<GraphSpaceID, JobID, TaskID> MetaKeyUtils::parseTaskKey(folly::String
std::string MetaKeyUtils::taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
int64_t stopTime,
nebula::cpp2::ErrorCode errCode) {
std::string val;
val.reserve(128);
val.append(MetaKeyUtils::serializeHostAddr(host))
.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&errCode), sizeof(nebula::cpp2::ErrorCode));
return val;
}

std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> MetaKeyUtils::parseTaskVal(
folly::StringPiece rawVal) {
std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t, nebula::cpp2::ErrorCode>
MetaKeyUtils::parseTaskVal(folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2);
sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2 +
sizeof(nebula::cpp2::ErrorCode));
size_t offset = 0;
HostAddr host = MetaKeyUtils::deserializeHostAddr(rawVal);
offset += sizeof(size_t);
Expand All @@ -1498,7 +1510,9 @@ std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> MetaKeyUtils::pars
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(host, status, tStart, tStop);
offset += sizeof(int64_t);
auto errCode = *reinterpret_cast<const nebula::cpp2::ErrorCode*>(rawVal.data() + offset);
return std::make_tuple(host, status, tStart, tStop, errCode);
}

} // namespace nebula
24 changes: 15 additions & 9 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,19 @@ class MetaKeyUtils final {
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);
int64_t stopTime,
nebula::cpp2::ErrorCode errCode);
/**
* @brief Decode val from kvstore, return
* {jobType, paras, status, start time, stop time}
* {jobType, paras, status, start time, stop time, error code}
*/
static std::
tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
parseJobVal(folly::StringPiece rawVal);
static std::tuple<meta::cpp2::JobType,
std::vector<std::string>,
meta::cpp2::JobStatus,
int64_t,
int64_t,
nebula::cpp2::ErrorCode>
parseJobVal(folly::StringPiece rawVal);

static std::pair<GraphSpaceID, JobID> parseJobKey(folly::StringPiece key);

Expand All @@ -464,14 +469,15 @@ class MetaKeyUtils final {
static std::string taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);
int64_t stopTime,
nebula::cpp2::ErrorCode errCode);

/**
* @brief Decode task val,it should be
* {host, status, start time, stop time}
* {host, status, start time, stop time, error code}
*/
static std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> parseTaskVal(
folly::StringPiece rawVal);
static std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t, nebula::cpp2::ErrorCode>
parseTaskVal(folly::StringPiece rawVal);
};

} // namespace nebula
Expand Down
19 changes: 14 additions & 5 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE ||
jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) {
nebula::DataSet v(
{"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"});
nebula::DataSet v({"Job Id(spaceId:partId)",
"Command(src->dst)",
"Status",
"Start Time",
"Stop Time",
"Error Code"});
const auto &paras = jd.get_paras();
size_t index = std::stoul(paras.back());
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
Expand All @@ -122,7 +126,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
convertJobTimestampToDateTime(jd.get_stop_time()).toString()}));
convertJobTimestampToDateTime(jd.get_stop_time()).toString(),
apache::thrift::util::enumNameSafe(jd.get_code())}));
for (size_t i = index; i < paras.size() - 1; i++) {
meta::cpp2::BalanceTask tsk;
apache::thrift::CompactSerializer::deserialize(paras[i], tsk);
Expand All @@ -144,7 +149,8 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
std::move(tsk).get_command(),
apache::thrift::util::enumNameSafe(tsk.get_result()),
convertJobTimestampToDateTime(std::move(tsk).get_start_time()),
convertJobTimestampToDateTime(std::move(tsk).get_stop_time())}));
convertJobTimestampToDateTime(std::move(tsk).get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code())}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
folly::sformat("Succeeded:{}", succeeded),
Expand All @@ -153,13 +159,15 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
folly::sformat("Invalid:{}", invalid)}));
return v;
} else {
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
nebula::DataSet v(
{"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time", "Error Code"});
v.emplace_back(nebula::Row({
jd.get_job_id(),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()),
convertJobTimestampToDateTime(jd.get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code()),
}));
// tasks desc
for (const auto &taskDesc : td) {
Expand All @@ -169,6 +177,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
convertJobTimestampToDateTime(taskDesc.get_start_time()),
convertJobTimestampToDateTime(taskDesc.get_stop_time()),
apache::thrift::util::enumNameSafe(taskDesc.get_code()),
}));
}
return v;
Expand Down
2 changes: 2 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ struct JobDesc {
5: JobStatus status,
6: i64 start_time,
7: i64 stop_time,
8: common.ErrorCode code,
}

struct TaskDesc {
Expand All @@ -277,6 +278,7 @@ struct TaskDesc {
5: JobStatus status,
6: i64 start_time,
7: i64 stop_time,
8: common.ErrorCode code,
}

struct AdminJobResult {
Expand Down
18 changes: 14 additions & 4 deletions src/meta/processors/job/JobDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ JobDescription::JobDescription(GraphSpaceID space,
std::vector<std::string> paras,
Status status,
int64_t startTime,
int64_t stopTime)
int64_t stopTime,
nebula::cpp2::ErrorCode errCode)
: space_(space),
jobId_(jobId),
type_(type),
paras_(std::move(paras)),
status_(status),
startTime_(startTime),
stopTime_(stopTime) {}
stopTime_(stopTime),
errCode_(errCode) {}

ErrorOr<nebula::cpp2::ErrorCode, JobDescription> JobDescription::makeJobDescription(
folly::StringPiece rawkey, folly::StringPiece rawval) {
Expand All @@ -52,8 +54,15 @@ ErrorOr<nebula::cpp2::ErrorCode, JobDescription> JobDescription::makeJobDescript
auto status = std::get<2>(tup);
auto startTime = std::get<3>(tup);
auto stopTime = std::get<4>(tup);
return JobDescription(
spaceIdAndJob.first, spaceIdAndJob.second, type, paras, status, startTime, stopTime);
auto errCode = std::get<5>(tup);
return JobDescription(spaceIdAndJob.first,
spaceIdAndJob.second,
type,
paras,
status,
startTime,
stopTime,
errCode);
} catch (std::exception& ex) {
LOG(INFO) << ex.what();
}
Expand All @@ -69,6 +78,7 @@ cpp2::JobDesc JobDescription::toJobDesc() {
ret.status_ref() = status_;
ret.start_time_ref() = startTime_;
ret.stop_time_ref() = stopTime_;
ret.code_ref() = errCode_;
return ret;
}

Expand Down
12 changes: 11 additions & 1 deletion src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class JobDescription {
std::vector<std::string> paras = {},
Status status = Status::QUEUE,
int64_t startTime = 0,
int64_t stopTime = 0);
int64_t stopTime = 0,
nebula::cpp2::ErrorCode errCode = nebula::cpp2::ErrorCode::E_UNKNOWN);

/**
* @brief Return the JobDescription if both key & val is valid
Expand Down Expand Up @@ -119,6 +120,14 @@ class JobDescription {
return stopTime_;
}

void setErrorCode(nebula::cpp2::ErrorCode errCode) {
errCode_ = errCode;
}

nebula::cpp2::ErrorCode getErrorCode() {
return errCode_;
}

/**
* @brief
* Get a existed job from kvstore, return folly::none if there isn't
Expand Down Expand Up @@ -167,6 +176,7 @@ class JobDescription {
Status status_;
int64_t startTime_;
int64_t stopTime_;
nebula::cpp2::ErrorCode errCode_{nebula::cpp2::ErrorCode::E_UNKNOWN};
};

} // namespace meta
Expand Down
45 changes: 38 additions & 7 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
for (auto& jd : jds) {
jd.setStatus(cpp2::JobStatus::QUEUE, true);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(
jd.getJobType(), jd.getParas(), jd.getStatus(), jd.getStartTime(), jd.getStopTime());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
jd.getStatus(),
jd.getStartTime(),
jd.getStopTime(),
jd.getErrorCode());
save(jobKey, jobVal);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -142,7 +146,8 @@ void JobManager::scheduleThread() {
jobDesc.getParas(),
jobDesc.getStatus(),
jobDesc.getStartTime(),
jobDesc.getStopTime());
jobDesc.getStopTime(),
jobDesc.getErrorCode());
save(jobKey, jobVal);
spaceRunningJobs_.insert_or_assign(spaceId, true);
if (!runJobInternal(jobDesc, jobOp)) {
Expand Down Expand Up @@ -243,13 +248,37 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,
return nebula::cpp2::ErrorCode::E_SAVE_JOB_FAILURE;
}

// Set the errorcode of the job
nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED;
if (jobStatus != cpp2::JobStatus::FINISHED) {
// Traverse the tasks and find the first task errorcode unsuccessful
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
for (; iter->valid(); iter->next()) {
if (MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
auto tupTaskVal = MetaKeyUtils::parseTaskVal(iter->val());
jobErrCode = std::get<4>(tupTaskVal);
if (jobErrCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
}
optJobDesc.setErrorCode(jobErrCode);

spaceRunningJobs_.insert_or_assign(spaceId, false);
auto jobKey = MetaKeyUtils::jobKey(optJobDesc.getSpace(), optJobDesc.getJobId());
auto jobVal = MetaKeyUtils::jobVal(optJobDesc.getJobType(),
optJobDesc.getParas(),
optJobDesc.getStatus(),
optJobDesc.getStartTime(),
optJobDesc.getStopTime());
optJobDesc.getStopTime(),
optJobDesc.getErrorCode());
auto rc = save(jobKey, jobVal);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
Expand Down Expand Up @@ -280,6 +309,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
auto status = code == nebula::cpp2::ErrorCode::SUCCEEDED ? cpp2::JobStatus::FINISHED
: cpp2::JobStatus::FAILED;
td.setStatus(status);
td.setErrorCode(code);

auto spaceId = req.get_space_id();
auto jobId = req.get_job_id();
Expand All @@ -300,8 +330,8 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
}

auto taskKey = MetaKeyUtils::taskKey(td.getSpace(), td.getJobId(), td.getTaskId());
auto taskVal =
MetaKeyUtils::taskVal(td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime());
auto taskVal = MetaKeyUtils::taskVal(
td.getHost(), td.getStatus(), td.getStartTime(), td.getStopTime(), td.getErrorCode());
auto rcSave = save(taskKey, taskVal);
if (rcSave != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rcSave;
Expand Down Expand Up @@ -392,7 +422,8 @@ nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient*
jobDesc.getParas(),
jobDesc.getStatus(),
jobDesc.getStartTime(),
jobDesc.getStopTime());
jobDesc.getStopTime(),
jobDesc.getErrorCode());
auto rc = save(jobKey, jobVal);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
enqueue(spaceId, jobId, JbOp::ADD, jobDesc.getJobType());
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/RebuildJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace nebula {
namespace meta {

nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() {
// The last value of paras_ are index name
// The value of paras_ are index name
auto spaceRet = spaceExist();
if (spaceRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find the space, spaceId " << space_;
Expand Down
7 changes: 5 additions & 2 deletions src/meta/processors/job/StorageJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() {
for (auto i = 0U; i != addresses.size(); ++i) {
TaskDescription task(space_, jobId_, i, addresses[i].first);
auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId());
auto taskVal = MetaKeyUtils::taskVal(
task.getHost(), task.getStatus(), task.getStartTime(), task.getStopTime());
auto taskVal = MetaKeyUtils::taskVal(task.getHost(),
task.getStatus(),
task.getStartTime(),
task.getStopTime(),
task.getErrorCode());
data.emplace_back(std::move(taskKey), std::move(taskVal));
}

Expand Down
Loading

0 comments on commit b68d059

Please sign in to comment.