From de57a130f83050bded84a9d2eced8cf5f217f81c Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Mon, 21 Mar 2022 15:08:09 +0800 Subject: [PATCH] improve job manager UT --- src/meta/processors/job/JobManager.h | 5 ++-- src/meta/test/JobManagerTest.cpp | 34 ++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index b413b5b6094..1cd64f19f9b 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -164,9 +164,8 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { size_t jobSize() const; /** - * @brief Traverse from priorityQueues_ according to spaceId, - * and find the priorityQueue of the space that is not executing the job. - * Then take a job from the queue according to the priority. + * @brief Traverse from priorityQueues_, and find the priorityQueue of the space + * that is not executing the job. Then take a job from the queue according to the priority. * * @param opJobId * @return return true if the element is obtained, otherwise return false. diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 597de991cfb..ca40cba48b8 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -143,6 +143,7 @@ TEST_F(JobManagerTest, StatsJob) { ASSERT_EQ(cpp2::JobStatus::FINISHED, job.getStatus()); } +// Jobs are parallelized between spaces, and serialized by priority within spaces TEST_F(JobManagerTest, JobPriority) { std::unique_ptr> jobMgr = getJobManager(); // For preventing job schedule in JobManager @@ -161,21 +162,46 @@ TEST_F(JobManagerTest, JobPriority) { auto rc2 = jobMgr->addJob(jobDesc2, adminClient_.get()); ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED); - ASSERT_EQ(2, jobMgr->jobSize()); + GraphSpaceID spaceId2 = 2; + JobID jobId3 = 15; + JobDescription jobDesc3(spaceId2, jobId3, cpp2::JobType::STATS); + auto rc3 = jobMgr->addJob(jobDesc3, adminClient_.get()); + ASSERT_EQ(rc3, nebula::cpp2::ErrorCode::SUCCEEDED); + + ASSERT_EQ(3, jobMgr->jobSize()); std::tuple opJobId; auto result = jobMgr->tryDequeue(opJobId); ASSERT_TRUE(result); ASSERT_EQ(14, std::get<1>(opJobId)); - ASSERT_EQ(1, jobMgr->jobSize()); + ASSERT_EQ(spaceId, std::get<2>(opJobId)); + // Suppose job starts executing + jobMgr->spaceRunningJobs_.insert_or_assign(spaceId, true); + + ASSERT_EQ(2, jobMgr->jobSize()); result = jobMgr->tryDequeue(opJobId); ASSERT_TRUE(result); - ASSERT_EQ(13, std::get<1>(opJobId)); - ASSERT_EQ(0, jobMgr->jobSize()); + ASSERT_EQ(15, std::get<1>(opJobId)); + ASSERT_EQ(spaceId2, std::get<2>(opJobId)); + // Suppose job starts executing + jobMgr->spaceRunningJobs_.insert_or_assign(spaceId2, true); + + ASSERT_EQ(1, jobMgr->jobSize()); result = jobMgr->tryDequeue(opJobId); + // Because all spaces are running jobs ASSERT_FALSE(result); + + // Suppose the job execution is complete + jobMgr->spaceRunningJobs_.insert_or_assign(spaceId, false); + jobMgr->spaceRunningJobs_.insert_or_assign(spaceId2, false); + ASSERT_EQ(1, jobMgr->jobSize()); + result = jobMgr->tryDequeue(opJobId); + ASSERT_TRUE(result); + ASSERT_EQ(13, std::get<1>(opJobId)); + ASSERT_EQ(spaceId, std::get<2>(opJobId)); + ASSERT_EQ(0, jobMgr->jobSize()); } TEST_F(JobManagerTest, JobDeduplication) {