From 9e71515cb34ff3580424286c19364ef97461ed41 Mon Sep 17 00:00:00 2001 From: Alex Xing <90179377+SuperYoko@users.noreply.github.com> Date: Fri, 6 Jan 2023 09:54:38 +0800 Subject: [PATCH] modify jobmanager ut (#5175) * modify jobmanager ut * add expired ut * avoid recover expired job * add ut * address review * move status Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/meta/processors/job/JobDescription.cpp | 6 + src/meta/processors/job/JobExecutor.h | 9 +- src/meta/processors/job/JobManager.cpp | 31 +- src/meta/processors/job/JobManager.h | 7 +- src/meta/processors/job/JobStatus.cpp | 4 + src/meta/processors/job/JobStatus.h | 2 + src/meta/test/JobManagerTest.cpp | 1207 +++++++++++++++++++- 7 files changed, 1210 insertions(+), 56 deletions(-) diff --git a/src/meta/processors/job/JobDescription.cpp b/src/meta/processors/job/JobDescription.cpp index 37beaa40aa0..1e64c890faf 100644 --- a/src/meta/processors/job/JobDescription.cpp +++ b/src/meta/processors/job/JobDescription.cpp @@ -11,6 +11,7 @@ #include #include "common/utils/MetaKeyUtils.h" +#include "interface/gen-cpp2/meta_types.h" #include "kvstore/KVIterator.h" #include "meta/processors/Common.h" @@ -83,6 +84,11 @@ cpp2::JobDesc JobDescription::toJobDesc() { } bool JobDescription::setStatus(Status newStatus, bool force) { + if (JobStatus::notSetable(status_)) { + // no need to change time. + return status_ == newStatus; + } + if (JobStatus::laterThan(status_, newStatus) && !force) { return false; } diff --git a/src/meta/processors/job/JobExecutor.h b/src/meta/processors/job/JobExecutor.h index b5cafb234fc..7be816f3598 100644 --- a/src/meta/processors/job/JobExecutor.h +++ b/src/meta/processors/job/JobExecutor.h @@ -19,7 +19,7 @@ namespace meta { class JobExecutor { public: JobExecutor() = default; - explicit JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {} + JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {} virtual ~JobExecutor() = default; /** @@ -99,9 +99,10 @@ class JobExecutor { class JobExecutorFactory { public: - static std::unique_ptr createJobExecutor(const JobDescription& jd, - kvstore::KVStore* store, - AdminClient* client); + virtual ~JobExecutorFactory() = default; + virtual std::unique_ptr createJobExecutor(const JobDescription& jd, + kvstore::KVStore* store, + AdminClient* client); }; } // namespace meta diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index a2119e0d39d..c9d31c417fb 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -12,6 +12,7 @@ #include #include +#include #include "common/stats/StatsManager.h" #include "common/time/WallClock.h" @@ -42,7 +43,10 @@ JobManager* JobManager::getInstance() { return &inst; } -bool JobManager::init(nebula::kvstore::KVStore* store, AdminClient* adminClient) { +bool JobManager::init(nebula::kvstore::KVStore* store, + AdminClient* adminClient, + std::shared_ptr factory) { + executorFactory_ = factory; adminClient_ = adminClient; if (store == nullptr) { return false; @@ -97,7 +101,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { if (nebula::ok(optJobRet)) { auto optJob = nebula::value(optJobRet); std::unique_ptr je = - JobExecutorFactory::createJobExecutor(optJob, kvStore_, adminClient_); + executorFactory_->createJobExecutor(optJob, kvStore_, adminClient_); // Only balance would change if (optJob.getStatus() == cpp2::JobStatus::RUNNING && je->isMetaJob()) { jds.emplace_back(std::move(optJob)); @@ -235,7 +239,7 @@ folly::Future JobManager::runJobInternal(const JobDescr iter = this->muJobFinished_.emplace(spaceId, std::make_unique()).first; } std::lock_guard lk(*(iter->second)); - auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); + auto je = executorFactory_->createJobExecutor(jobDesc, kvStore_, adminClient_); jobExec = je.get(); runningJobs_.emplace(jobDesc.getJobId(), std::move(je)); @@ -440,7 +444,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, } auto optJobDesc = nebula::value(optJobDescRet); - auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_); + auto jobExec = executorFactory_->createJobExecutor(optJobDesc, kvStore_, adminClient_); if (!jobExec) { LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId); @@ -682,6 +686,11 @@ bool JobManager::isExpiredJob(JobDescription& jobDesc) { return false; } auto jobStart = jobDesc.getStartTime(); + if (jobStart == 0) { + // should not happend, but just in case keep this job + LOG(INFO) << "Job " << jobDesc.getJobId() << " start time is not set, keep it for now"; + return false; + } auto duration = std::difftime(nebula::time::WallClock::fastNowInSec(), jobStart); return duration > FLAGS_job_expired_secs; } @@ -848,7 +857,9 @@ ErrorOr JobManager::recoverJob( for (auto& [id, job] : allJobs) { auto status = job.getStatus(); if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) { - jobsMaybeRecover.emplace(id); + if (!isExpiredJob(job)) { + jobsMaybeRecover.emplace(id); + } } } std::set::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend(); @@ -869,7 +880,8 @@ ErrorOr JobManager::recoverJob( JobID jid; bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid); if (!jobExist) { - job.setStatus(cpp2::JobStatus::QUEUE, true); + job.setStatus(cpp2::JobStatus::QUEUE, true); // which cause the job execute again + job.setErrorCode(nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId); auto jobVal = MetaKeyUtils::jobVal(job.getJobType(), job.getParas(), @@ -919,9 +931,8 @@ ErrorOr JobManager::recoverJob( auto jobType = allJobs[*it].getJobType(); if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { if (jobIdSet.empty() || jobIdSet.count(*it)) { - LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt - << " when there's a newer balance job " << *lastBalaceJobRecoverIt - << " stopped or failed"; + LOG(INFO) << "can't recover a balance job " << *it << " when there's a newer balance job " + << *lastBalaceJobRecoverIt << " stopped or failed"; } it = jobsMaybeRecover.erase(it); } else { @@ -931,7 +942,7 @@ ErrorOr JobManager::recoverJob( if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) { if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) { LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt - << " that before a finished balance job " << lastBalanceJobFinished; + << " when there's a newer balance job " << lastBalanceJobFinished << " finished"; } jobsMaybeRecover.erase(*lastBalaceJobRecoverIt); } diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 1761bc303ae..a3080c92e8f 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -11,6 +11,7 @@ #include #include +#include #include "common/base/Base.h" #include "common/base/ErrorOr.h" @@ -18,6 +19,7 @@ #include "interface/gen-cpp2/meta_types.h" #include "kvstore/NebulaStore.h" #include "meta/processors/job/JobDescription.h" +#include "meta/processors/job/JobExecutor.h" #include "meta/processors/job/JobStatus.h" #include "meta/processors/job/StorageJobExecutor.h" #include "meta/processors/job/TaskDescription.h" @@ -80,7 +82,9 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { * @param adminClient * @return true if the init is successful */ - bool init(nebula::kvstore::KVStore* store, AdminClient* adminClient); + bool init(nebula::kvstore::KVStore* store, + AdminClient* adminClient, + std::shared_ptr factory = std::make_shared()); /** * @brief Called when receive a system signal @@ -331,6 +335,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { std::atomic status_ = JbmgrStatus::NOT_START; std::unique_ptr executor_; + std::shared_ptr executorFactory_; }; } // namespace meta diff --git a/src/meta/processors/job/JobStatus.cpp b/src/meta/processors/job/JobStatus.cpp index 67b90fe16b4..297273d2635 100644 --- a/src/meta/processors/job/JobStatus.cpp +++ b/src/meta/processors/job/JobStatus.cpp @@ -30,6 +30,10 @@ bool JobStatus::laterThan(Status lhs, Status rhs) { return phaseNumber(lhs) > phaseNumber(rhs); } +bool JobStatus::notSetable(Status st) { + return st == Status::FINISHED; +} + std::string JobStatus::toString(Status st) { switch (st) { case Status::QUEUE: diff --git a/src/meta/processors/job/JobStatus.h b/src/meta/processors/job/JobStatus.h index bb7a173f057..927a3573b1b 100644 --- a/src/meta/processors/job/JobStatus.h +++ b/src/meta/processors/job/JobStatus.h @@ -9,6 +9,7 @@ #include #include +#include "common/base/Status.h" #include "interface/gen-cpp2/meta_types.h" namespace nebula { @@ -20,6 +21,7 @@ class JobStatus { public: static std::string toString(Status st); static bool laterThan(Status lhs, Status rhs); + static bool notSetable(Status st); private: static int phaseNumber(Status st); diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index f1ed97d86ce..d85562df548 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -4,22 +4,37 @@ */ #include +#include #include #include +#include + +#include +#include #include "common/base/Base.h" #include "common/fs/TempDir.h" +#include "common/thrift/ThriftTypes.h" +#include "interface/gen-cpp2/common_types.h" +#include "interface/gen-cpp2/meta_types.h" #include "kvstore/Common.h" +#include "kvstore/KVStore.h" #include "meta/ActiveHostsMan.h" #include "meta/processors/job/DownloadJobExecutor.h" #include "meta/processors/job/IngestJobExecutor.h" +#include "meta/processors/job/JobDescription.h" +#include "meta/processors/job/JobExecutor.h" #include "meta/processors/job/JobManager.h" +#include "meta/processors/job/JobStatus.h" #include "meta/processors/job/TaskDescription.h" #include "meta/test/MockAdminClient.h" #include "meta/test/MockHdfsHelper.h" #include "meta/test/TestUtils.h" #include "webservice/WebService.h" +DECLARE_int32(job_check_intervals); +DECLARE_double(job_expired_secs); + namespace nebula { namespace meta { @@ -33,9 +48,145 @@ using ::testing::Return; using ::testing::SetArgPointee; using ::testing::StrictMock; +class MockExecutorFactory : public JobExecutorFactory { + public: + MockExecutorFactory() = default; + ~MockExecutorFactory() override = default; + + MOCK_METHOD(std::unique_ptr, + createJobExecutor, + (const JobDescription& jd, kvstore::KVStore* store, AdminClient* client), + (override)); +}; + +class DummyExecutor : public JobExecutor { + public: + DummyExecutor() = default; + ~DummyExecutor() override = default; + + explicit DummyExecutor(const JobDescription& desc, kvstore::KVStore* store) + : JobExecutor(store, desc.getSpace()), jobDesc_(desc) {} + + nebula::cpp2::ErrorCode check() override { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + nebula::cpp2::ErrorCode prepare() override { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + folly::Future execute() override { + folly::Promise promise; + promise.setValue(nebula::cpp2::ErrorCode::SUCCEEDED); + jobDesc_.setStatus(cpp2::JobStatus::FINISHED, true); + return promise.getFuture(); + } + + nebula::cpp2::ErrorCode stop() override { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + nebula::cpp2::ErrorCode finish(bool ret) override { + UNUSED(ret); + setJobDescStatus(cpp2::JobStatus::FINISHED); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + nebula::cpp2::ErrorCode recovery() override { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + bool isMetaJob() override = 0; + + JobDescription getJobDescription() override { + return jobDesc_; + } + + nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) override { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + bool isRunning() override { + return isRunning_.load(); + } + + void resetRunningStatus() override { + isRunning_.store(false); + } + + public: + void setJobDescStatus(cpp2::JobStatus status) { + jobDesc_.setStatus(status, true); + } + + protected: + JobDescription jobDesc_; +}; + +class DummyMetaExecutor : public DummyExecutor { + public: + DummyMetaExecutor() = default; + ~DummyMetaExecutor() override = default; + + explicit DummyMetaExecutor(const JobDescription& desc, kvstore::KVStore* store) + : DummyExecutor(desc, store) {} + + bool isMetaJob() override { + return true; + } +}; + +class DummyStorageExecutor : public DummyExecutor { + public: + DummyStorageExecutor() = default; + ~DummyStorageExecutor() override = default; + + explicit DummyStorageExecutor(const JobDescription& desc, kvstore::KVStore* store) + : DummyExecutor(desc, store) {} + + bool isMetaJob() override { + return false; + } + + folly::Future execute() override { + folly::Promise promise; + promise.setValue(nebula::cpp2::ErrorCode::SUCCEEDED); + + TaskDescription task(space_, jobDesc_.getJobId(), 0, HostAddr("", 0)); + auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); + auto taskVal = MetaKeyUtils::taskVal(task.getHost(), + task.getStatus(), + task.getStartTime(), + task.getStopTime(), + task.getErrorCode()); + std::vector data; + data.emplace_back(std::move(taskKey), std::move(taskVal)); + folly::Baton baton; + auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; + kvstore_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + rc = code; + baton.post(); + }); + baton.wait(); + return promise.getFuture(); + } +}; + +class UnstoppableDummyStorageExecutor : public DummyStorageExecutor { + public: + UnstoppableDummyStorageExecutor(const JobDescription& desc, kvstore::KVStore* store) + : DummyStorageExecutor(desc, store) {} + nebula::cpp2::ErrorCode stop() override { + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; + } +}; + class JobManagerTest : public ::testing::Test { protected: void SetUp() override { + job_interval = FLAGS_job_check_intervals; // save the original value + FLAGS_job_check_intervals = 50 * 1000; // 50ms for test to control schedule. rootPath_ = std::make_unique("/tmp/JobManager.XXXXXX"); mock::MockCluster cluster; kv_ = cluster.initMetaKV(rootPath_->path()); @@ -69,7 +220,7 @@ class JobManagerTest : public ::testing::Test { [] { return folly::Future>(true); }); } - std::unique_ptr> getJobManager() { + JobManager* initJobManager(std::shared_ptr factory = nullptr) { std::unique_ptr> jobMgr( new JobManager(), [](JobManager* p) { std::tuple opJobId; @@ -80,57 +231,1031 @@ class JobManagerTest : public ::testing::Test { }); jobMgr->status_ = JobManager::JbmgrStatus::NOT_START; jobMgr->kvStore_ = kv_.get(); - jobMgr->init(kv_.get(), adminClient_.get()); - return jobMgr; + if (factory == nullptr) { + jobMgr->init(kv_.get(), adminClient_.get()); + } else { + jobMgr->init(kv_.get(), adminClient_.get(), factory); + } + jobMgr_ = std::move(jobMgr); + return jobMgr_.get(); + } + + void disableSchedule(JobManager* jobMgr) { + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + } + + void enableSchedule(JobManager* jobMgr) { + jobMgr->status_.store(JobManager::JbmgrStatus::IDLE, std::memory_order_release); + jobMgr->bgThread_ = std::thread(&JobManager::scheduleThread, jobMgr); + } + + void reportTaskFinish(nebula::cpp2::ErrorCode code, + GraphSpaceID spaceId, + int32_t jobId, + int32_t taskId) { + cpp2::ReportTaskReq req; + req.code_ref() = code; + req.space_id_ref() = spaceId; + req.job_id_ref() = jobId; + req.task_id_ref() = taskId; + jobMgr_->reportTaskFinish(req); + } + + nebula::cpp2::ErrorCode save(const std::string& k, const std::string& v) { + return jobMgr_->save(k, v); } void TearDown() override { + FLAGS_job_check_intervals = job_interval; // restore the original value + if (jobMgr_) { + if (jobMgr_->status_ != JobManager::JbmgrStatus::STOPPED) { + jobMgr_->shutDown(); + } + jobMgr_.reset(); + } + kv_.reset(); rootPath_.reset(); } + int32_t job_interval{0}; std::unique_ptr rootPath_{nullptr}; std::unique_ptr kv_{nullptr}; std::unique_ptr adminClient_{nullptr}; + std::unique_ptr> jobMgr_{nullptr}; }; HostAddr toHost(std::string strIp) { return HostAddr(strIp, 0); } -TEST_F(JobManagerTest, AddJob) { - std::unique_ptr> jobMgr = getJobManager(); - // For preventing job schedule in JobManager - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); +TEST_F(JobManagerTest, AddAndSchedule) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job GraphSpaceID spaceId = 1; - JobID jobId = 2; - JobDescription jobDesc(spaceId, jobId, cpp2::JobType::COMPACT); - auto rc = jobMgr->addJob(jobDesc); - ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); + JobID jobId = 1; + std::vector paras{"tag_index_name"}; - // If there is a failed data balance job, a new job cannot be added - JobID jobId3 = 3; - JobDescription jobDesc3(spaceId, jobId3, cpp2::JobType::DATA_BALANCE); - jobDesc3.setStatus(cpp2::JobStatus::FAILED); - auto jobKey = MetaKeyUtils::jobKey(jobDesc3.getSpace(), jobDesc3.getJobId()); - auto jobVal = MetaKeyUtils::jobVal(jobDesc3.getJobType(), - jobDesc3.getParas(), - jobDesc3.getStatus(), - jobDesc3.getStartTime(), - jobDesc3.getStopTime(), - jobDesc3.getErrorCode()); - jobMgr->save(std::move(jobKey), std::move(jobVal)); + JobDescription desc{spaceId, jobId, cpp2::JobType::REBUILD_TAG_INDEX, paras}; + disableSchedule(jobMgr); + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // Add job with different paras is okay for job not data/zone balance. + JobID newJobId = 2; + JobDescription desc2{spaceId, newJobId, cpp2::JobType::REBUILD_TAG_INDEX}; + auto runningJobExist = + jobMgr->checkOnRunningJobExist(spaceId, desc2.getJobType(), desc2.getParas(), newJobId); + ASSERT_FALSE(runningJobExist); + } + + { + // Add job with same paras is not okay for job not data/zone balance. + JobID newJobId = 2; + JobDescription desc2{spaceId, newJobId, cpp2::JobType::REBUILD_TAG_INDEX, paras}; + auto runningJobExist = + jobMgr->checkOnRunningJobExist(spaceId, desc2.getJobType(), desc2.getParas(), newJobId); + ASSERT_TRUE(runningJobExist); + } + + { + // Job should not affect other space. + JobID otherSpaceJobId = 3; + GraphSpaceID otherSpaceId = 2; + JobDescription desc3{otherSpaceId, otherSpaceJobId, cpp2::JobType::REBUILD_TAG_INDEX}; + auto otherRunningJobExist = jobMgr->checkOnRunningJobExist( + otherSpaceId, desc3.getJobType(), desc3.getParas(), otherSpaceJobId); + ASSERT_FALSE(otherRunningJobExist); + } + + { + // Test show jobs when job is still in queue + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + + // Test job in queue can not be recovered. + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } + { + enableSchedule(jobMgr); + // Wait schedule thread to schedule the job + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::RUNNING); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + + // Test job running can not be recovered. + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } + { + // report task finish. + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + + // Check job is finished. + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + + // Test job finished can not be recovered. + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } +} + +class DummyFailedDataBalanceExecutor : public DummyMetaExecutor { + public: + explicit DummyFailedDataBalanceExecutor(const JobDescription& desc, kvstore::KVStore* store) + : DummyMetaExecutor(desc, store) {} + + folly::Future execute() override { + folly::Promise promise; + promise.setValue(nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + setJobDescStatus(cpp2::JobStatus::FAILED); + return promise.getFuture(); + } +}; + +TEST_F(JobManagerTest, FailedDataBalanceBlockFollowing) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a failed data balance job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::DATA_BALANCE}; + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { // wait schedule thread to schedule the job + usleep(FLAGS_job_check_intervals * 2); + } + { + // make sure job is failed + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + EXPECT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + } + { + // Check new job should be blocked + JobID newJobId = 2; + JobDescription desc2{spaceId, newJobId, cpp2::JobType::DATA_BALANCE}; + auto rc = jobMgr->checkNeedRecoverJobExist(spaceId, desc2.getJobType()); + ASSERT_EQ(rc, nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER); + } +} + +TEST_F(JobManagerTest, StopAndRecoverQueueJob) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::STATS}; + disableSchedule(jobMgr); + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // Stop job and check result + ec = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::STOPPED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + // Note: there's 1 job in queue, but it should be skipped when schedule. + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + + enableSchedule(jobMgr); + usleep(FLAGS_job_check_intervals * 2); + // job is skipped now. since it's stopped. + jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::STOPPED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + disableSchedule(jobMgr); + // recover job and check result + std::vector ids; + ids.emplace_back(jobId); + auto ret = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(ret)); + auto recoverNum = nebula::value(ret); + ASSERT_EQ(recoverNum, 1); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + enableSchedule(jobMgr); + usleep(FLAGS_job_check_intervals * 2); + // running recoveryed job and return success + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + } + + { + // recovered job will succeed + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + } +} + +TEST_F(JobManagerTest, StopAndRecoverRunningJob) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::STATS}; + + disableSchedule(jobMgr); + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // Check job in queue + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } - rc = jobMgr->checkNeedRecoverJobExist(spaceId, jobDesc3.getJobType()); - ASSERT_EQ(rc, nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER); + { + enableSchedule(jobMgr); + // Wait job running and then stop + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::RUNNING); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + // stop running job and check result + ec = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + // Wait job stopped and then recover + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::STOPPED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + // running job will report user cancel and finally failed. + reportTaskFinish(nebula::cpp2::ErrorCode::E_USER_CANCEL, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_USER_CANCEL); + } + + { + // recover failed job and check result + std::vector ids; + ids.emplace_back(jobId); + disableSchedule(jobMgr); + auto ret = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(ret)); + auto recoverNum = nebula::value(ret); + ASSERT_EQ(recoverNum, 1); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + enableSchedule(jobMgr); + // wait runnning and then report success + usleep(FLAGS_job_check_intervals * 2); + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + // recover job will succeed + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + } +} + +TEST_F(JobManagerTest, StopAndRecoverUnstoppableJob) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::COMPACT}; + disableSchedule(jobMgr); + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // Check job is in queue + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + + { + // Stop job will succeed if job is in queue + ec = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + // Check job is stopped + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 1); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::STOPPED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + } + { + // recover job will succeed + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 1); + + // Check job is in queue + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 2); // queue 1 add and 1 recover + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + // a recover job can not be stopped? + } + { + enableSchedule(jobMgr); + // wait job running and stop it + usleep(FLAGS_job_check_intervals * 2); + + auto jobSize = jobMgr->jobSize(); + EXPECT_EQ(jobSize, 1); // a running job and a queue job(recover) + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::RUNNING); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + + // stop a running unstoppable job will fail + ec = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE); + } + { + // wait job finish + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + // BUG: This job is finished once, and recover job is running again and skipped(fixed) + // BUG: The order is not correct. + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + } + { + ec = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::E_JOB_ALREADY_FINISH); + } +} + +TEST_F(JobManagerTest, RecoverRunningFailedJob) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::STATS}; + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // Check job running and failed + usleep(FLAGS_job_check_intervals * 2); + reportTaskFinish(nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED); + } + { + // recover job will succeed + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 1); + + usleep(FLAGS_job_check_intervals * 2); + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + } +} + +TEST_F(JobManagerTest, RecoverBalanceJob) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + // Add a job + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription desc{spaceId, jobId, cpp2::JobType::ZONE_BALANCE}; + auto ec = jobMgr->addJob(desc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // check job failed + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + } + + // add another job and show failed. + JobID newJobId = 2; + JobDescription newDesc{spaceId, newJobId, cpp2::JobType::ZONE_BALANCE}; + ec = jobMgr->addJob(newDesc); + ASSERT_EQ(ec, nebula::cpp2::ErrorCode::SUCCEEDED); + + { + // check job failed + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 2); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + ASSERT_EQ(showJobs[1].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[1].get_code(), nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + } + // new job should succeed. + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + + { + // recover first job should failed. + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } + { + // recover new job should ok. + std::vector ids; + ids.emplace_back(newJobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 1); + } + { + // recover job should finish. + usleep(FLAGS_job_check_intervals * 2); + auto jobSize = jobMgr->jobSize(); + ASSERT_EQ(jobSize, 0); + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 2); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + ASSERT_EQ(showJobs[1].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[1].get_code(), nebula::cpp2::ErrorCode::E_BALANCER_FAILURE); + } + { + // recover first job now should failed since a new finished balance job exists. + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } +} + +TEST_F(JobManagerTest, RecoverExpiredJobTest) { + auto factory = std::make_shared(); + EXPECT_CALL(*factory, createJobExecutor(_, _, _)) + .WillRepeatedly([](const JobDescription& jd, kvstore::KVStore* store, AdminClient* client) { + UNUSED(client); + return std::make_unique(jd, store); + }); + auto jobMgr = initJobManager(factory); + + auto nowTimeInSec = nebula::time::WallClock::fastNowInSec(); + auto expiredJobTime = std::difftime(nowTimeInSec, FLAGS_job_expired_secs + 1); + { + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FAILED, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::E_UNKNOWN); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + JobID jobId = 1; + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 0); + } + // recover job should finish. + { + GraphSpaceID spaceId = 1; + JobID jobId = 2; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FAILED, + expiredJobTime + 10, + expiredJobTime + 11, + nebula::cpp2::ErrorCode::E_UNKNOWN); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + JobID jobId = 2; + std::vector ids; + ids.emplace_back(jobId); + auto recoverRet = jobMgr->recoverJob(spaceId, ids); + ASSERT_TRUE(nebula::ok(recoverRet)); + auto recoverNum = nebula::value(recoverRet); + ASSERT_EQ(recoverNum, 1); + + usleep(FLAGS_job_check_intervals * 2); + reportTaskFinish(nebula::cpp2::ErrorCode::SUCCEEDED, spaceId, jobId, 0); + usleep(FLAGS_job_check_intervals * 2); + + // should be ok + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_code(), nebula::cpp2::ErrorCode::SUCCEEDED); + } +} + +TEST_F(JobManagerTest, ExpiredJobTest) { + auto jobMgr = initJobManager(); + // For preventing job schedule in JobManager + auto nowTimeInSec = nebula::time::WallClock::fastNowInSec(); + auto expiredJobTime = std::difftime(nowTimeInSec, FLAGS_job_expired_secs + 1); + + // 1. failed job should be expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 1; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FAILED, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::E_UNKNOWN); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 0); + } + // 2. finished job should be expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 2; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FINISHED, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::SUCCEEDED); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 0); + } + // 3. stopped job should expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 3; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::STOPPED, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::E_USER_CANCEL); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 0); + } + + // 4. failed job not expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 4; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FAILED, + expiredJobTime + 10, + expiredJobTime + 11, + nebula::cpp2::ErrorCode::E_UNKNOWN); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 1); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[0].get_job_id(), 4); + } + // 5. stop job not expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 5; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::STOPPED, + expiredJobTime + 10, + expiredJobTime + 11, + nebula::cpp2::ErrorCode::E_USER_CANCEL); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 2); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::STOPPED); + ASSERT_EQ(showJobs[0].get_job_id(), 5); + ASSERT_EQ(showJobs[1].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(showJobs[1].get_job_id(), 4); + } + // 6. finished job not expired. + { + GraphSpaceID spaceId = 1; + JobID jobId = 6; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::FINISHED, + expiredJobTime + 10, + expiredJobTime + 11, + nebula::cpp2::ErrorCode::SUCCEEDED); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 3); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::FINISHED); + ASSERT_EQ(showJobs[0].get_job_id(), 6); + } + + // 7. queue job should stay even timeout + { + GraphSpaceID spaceId = 1; + JobID jobId = 7; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::QUEUE, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 4); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::QUEUE); + ASSERT_EQ(showJobs[0].get_job_id(), 7); + } + // 8. running job should stay even timeout + { + GraphSpaceID spaceId = 1; + JobID jobId = 8; + JobDescription jobDesc(spaceId, + jobId, + cpp2::JobType::STATS, + {}, + cpp2::JobStatus::RUNNING, + expiredJobTime, + expiredJobTime + 2, + nebula::cpp2::ErrorCode::E_JOB_SUBMITTED); + + auto jobKey = MetaKeyUtils::jobKey(jobDesc.getSpace(), jobDesc.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc.getJobType(), + jobDesc.getParas(), + jobDesc.getStatus(), + jobDesc.getStartTime(), + jobDesc.getStopTime(), + jobDesc.getErrorCode()); + save(std::move(jobKey), std::move(jobVal)); + } + { + GraphSpaceID spaceId = 1; + auto showRet = jobMgr->showJobs(spaceId); + ASSERT_TRUE(nebula::ok(showRet)); + auto showJobs = nebula::value(showRet); + ASSERT_EQ(showJobs.size(), 5); + ASSERT_EQ(showJobs[0].get_status(), cpp2::JobStatus::RUNNING); + ASSERT_EQ(showJobs[0].get_job_id(), 8); + } } TEST_F(JobManagerTest, AddRebuildTagIndexJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // For preventing job schedule in JobManager - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); + disableSchedule(jobMgr); std::vector paras{"tag_index_name"}; GraphSpaceID spaceId = 1; JobID jobId = 11; @@ -142,7 +1267,7 @@ TEST_F(JobManagerTest, AddRebuildTagIndexJob) { } TEST_F(JobManagerTest, AddRebuildEdgeIndexJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // For preventing job schedule in JobManager jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); @@ -206,7 +1331,7 @@ TEST_F(JobManagerTest, IngestJob) { } TEST_F(JobManagerTest, StatsJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // For preventing job schedule in JobManager jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); @@ -238,7 +1363,7 @@ TEST_F(JobManagerTest, StatsJob) { // Jobs are parallelized between spaces, and serialized by priority within spaces TEST_F(JobManagerTest, JobPriority) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // For preventing job schedule in JobManager jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); @@ -298,7 +1423,7 @@ TEST_F(JobManagerTest, JobPriority) { } TEST_F(JobManagerTest, JobDeduplication) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // For preventing job schedule in JobManager jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); @@ -354,7 +1479,7 @@ TEST_F(JobManagerTest, JobDeduplication) { } TEST_F(JobManagerTest, LoadJobDescription) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId = 1; JobID jobId1 = 1; JobDescription jobDesc1(spaceId, jobId1, cpp2::JobType::COMPACT); @@ -381,7 +1506,7 @@ TEST_F(JobManagerTest, LoadJobDescription) { } TEST_F(JobManagerTest, ShowJobs) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId = 1; JobID jobId1 = 1; JobDescription jobDesc1(spaceId, jobId1, cpp2::JobType::COMPACT); @@ -418,7 +1543,7 @@ TEST_F(JobManagerTest, ShowJobs) { } TEST_F(JobManagerTest, ShowJobsFromMultiSpace) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId1 = 1; JobID jobId1 = 1; JobDescription jd1(spaceId1, jobId1, cpp2::JobType::COMPACT); @@ -450,7 +1575,7 @@ TEST_F(JobManagerTest, ShowJobsFromMultiSpace) { } TEST_F(JobManagerTest, ShowJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId = 1; JobID jobId1 = 1; JobDescription jd(spaceId, jobId1, cpp2::JobType::COMPACT); @@ -516,7 +1641,7 @@ TEST_F(JobManagerTest, ShowJob) { } TEST_F(JobManagerTest, ShowJobInOtherSpace) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId1 = 1; JobID jobId1 = 1; JobDescription jd(spaceId1, jobId1, cpp2::JobType::COMPACT); @@ -555,7 +1680,7 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { } TEST_F(JobManagerTest, RecoverJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); // set status to prevent running the job since AdminClient is a injector jobMgr->status_.store(JobManager::JbmgrStatus::STOPPED, std::memory_order_release); jobMgr->bgThread_.join(); @@ -733,7 +1858,7 @@ TEST_F(JobManagerTest, RecoverJob) { } TEST_F(JobManagerTest, NotStoppableJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId = 1; PartitionID partId = 1; JobID jobId = 1; @@ -811,7 +1936,7 @@ TEST_F(JobManagerTest, NotStoppableJob) { } TEST_F(JobManagerTest, StoppableJob) { - std::unique_ptr> jobMgr = getJobManager(); + auto jobMgr = initJobManager(); GraphSpaceID spaceId = 1; JobID jobId = 1;