Skip to content

Commit

Permalink
fix bug stop job before it start
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Mar 24, 2022
1 parent dd270fb commit d6ab7a8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void JobManager::scheduleThread() {
auto jobOp = std::get<0>(opJobId);
auto jodId = std::get<1>(opJobId);
auto spaceId = std::get<2>(opJobId);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_);
if (!nebula::ok(jobDescRet)) {
LOG(INFO) << "[JobManager] load an invalid job from space " << spaceId << " jodId " << jodId;
Expand All @@ -152,8 +153,6 @@ void JobManager::scheduleThread() {
}

bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
auto spaceId = jobDesc.getSpace();
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -257,8 +256,10 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,

auto it = runningJobs_.find(jobId);
if (it == runningJobs_.end()) {
LOG(INFO) << folly::sformat("Can't find jobExecutor, jobId={}", jobId);
return nebula::cpp2::ErrorCode::E_UNKNOWN;
// the job has not started yet
// TODO job not existing in runningJobs_ also means leader changed, we handle it later
cleanJob(jobId);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
std::unique_ptr<JobExecutor>& jobExec = it->second;
if (jobStatus == cpp2::JobStatus::STOPPED) {
Expand Down Expand Up @@ -513,10 +514,10 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJob

bool JobManager::isExpiredJob(JobDescription& jobDesc) {
auto status = jobDesc.getStatus();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) {
auto jobStart = jobDesc.getStartTime();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING || jobStart == 0) {
return false;
}
auto jobStart = jobDesc.getStartTime();
auto duration = std::difftime(nebula::time::WallClock::fastNowInSec(), jobStart);
return duration > FLAGS_job_expired_secs;
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
AdminClient* adminClient_{nullptr};

std::map<GraphSpaceID, std::mutex> muReportFinish_;
// Start & stop & finish a job need mutual exclusion
// The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock
// in finish-callback in the same thread with runJobInternal
std::map<GraphSpaceID, std::recursive_mutex> muJobFinished_;
Expand Down

0 comments on commit d6ab7a8

Please sign in to comment.