Skip to content

Commit

Permalink
fix bug stop job before it start (#4061)
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul authored Mar 24, 2022
1 parent 456a28f commit 166e70c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ bool JobDescription::setStatus(Status newStatus, bool force) {
return false;
}
status_ = newStatus;
if (newStatus == Status::RUNNING) {
if (newStatus == Status::RUNNING || (newStatus == Status::STOPPED && startTime_ == 0)) {
startTime_ = std::time(nullptr);
}
if (JobStatus::laterThan(newStatus, Status::RUNNING)) {
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void JobManager::scheduleThread() {
auto jobOp = std::get<0>(opJobId);
auto jodId = std::get<1>(opJobId);
auto spaceId = std::get<2>(opJobId);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_);
if (!nebula::ok(jobDescRet)) {
LOG(INFO) << "[JobManager] load an invalid job from space " << spaceId << " jodId " << jodId;
Expand All @@ -152,8 +153,6 @@ void JobManager::scheduleThread() {
}

bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
auto spaceId = jobDesc.getSpace();
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -257,8 +256,10 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,

auto it = runningJobs_.find(jobId);
if (it == runningJobs_.end()) {
LOG(INFO) << folly::sformat("Can't find jobExecutor, jobId={}", jobId);
return nebula::cpp2::ErrorCode::E_UNKNOWN;
// the job has not started yet
// TODO job not existing in runningJobs_ also means leader changed, we handle it later
cleanJob(jobId);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
std::unique_ptr<JobExecutor>& jobExec = it->second;
if (jobStatus == cpp2::JobStatus::STOPPED) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
AdminClient* adminClient_{nullptr};

std::map<GraphSpaceID, std::mutex> muReportFinish_;
// Start & stop & finish a job need mutual exclusion
// The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock
// in finish-callback in the same thread with runJobInternal
std::map<GraphSpaceID, std::recursive_mutex> muJobFinished_;
Expand Down

0 comments on commit 166e70c

Please sign in to comment.