Skip to content

Commit

Permalink
improve job manager UT
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Mar 22, 2022
1 parent 206d16a commit de57a13
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
5 changes: 2 additions & 3 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
size_t jobSize() const;

/**
* @brief Traverse from priorityQueues_ according to spaceId,
* and find the priorityQueue of the space that is not executing the job.
* Then take a job from the queue according to the priority.
* @brief Traverse from priorityQueues_, and find the priorityQueue of the space
* that is not executing the job. Then take a job from the queue according to the priority.
*
* @param opJobId
* @return return true if the element is obtained, otherwise return false.
Expand Down
34 changes: 30 additions & 4 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ TEST_F(JobManagerTest, StatsJob) {
ASSERT_EQ(cpp2::JobStatus::FINISHED, job.getStatus());
}

// Jobs are parallelized between spaces, and serialized by priority within spaces
TEST_F(JobManagerTest, JobPriority) {
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// For preventing job schedule in JobManager
Expand All @@ -161,21 +162,46 @@ TEST_F(JobManagerTest, JobPriority) {
auto rc2 = jobMgr->addJob(jobDesc2, adminClient_.get());
ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED);

ASSERT_EQ(2, jobMgr->jobSize());
GraphSpaceID spaceId2 = 2;
JobID jobId3 = 15;
JobDescription jobDesc3(spaceId2, jobId3, cpp2::JobType::STATS);
auto rc3 = jobMgr->addJob(jobDesc3, adminClient_.get());
ASSERT_EQ(rc3, nebula::cpp2::ErrorCode::SUCCEEDED);

ASSERT_EQ(3, jobMgr->jobSize());

std::tuple<JobManager::JbOp, JobID, GraphSpaceID> opJobId;
auto result = jobMgr->tryDequeue(opJobId);
ASSERT_TRUE(result);
ASSERT_EQ(14, std::get<1>(opJobId));
ASSERT_EQ(1, jobMgr->jobSize());
ASSERT_EQ(spaceId, std::get<2>(opJobId));
// Suppose job starts executing
jobMgr->spaceRunningJobs_.insert_or_assign(spaceId, true);

ASSERT_EQ(2, jobMgr->jobSize());

result = jobMgr->tryDequeue(opJobId);
ASSERT_TRUE(result);
ASSERT_EQ(13, std::get<1>(opJobId));
ASSERT_EQ(0, jobMgr->jobSize());
ASSERT_EQ(15, std::get<1>(opJobId));
ASSERT_EQ(spaceId2, std::get<2>(opJobId));
// Suppose job starts executing
jobMgr->spaceRunningJobs_.insert_or_assign(spaceId2, true);

ASSERT_EQ(1, jobMgr->jobSize());

result = jobMgr->tryDequeue(opJobId);
// Because all spaces are running jobs
ASSERT_FALSE(result);

// Suppose the job execution is complete
jobMgr->spaceRunningJobs_.insert_or_assign(spaceId, false);
jobMgr->spaceRunningJobs_.insert_or_assign(spaceId2, false);
ASSERT_EQ(1, jobMgr->jobSize());
result = jobMgr->tryDequeue(opJobId);
ASSERT_TRUE(result);
ASSERT_EQ(13, std::get<1>(opJobId));
ASSERT_EQ(spaceId, std::get<2>(opJobId));
ASSERT_EQ(0, jobMgr->jobSize());
}

TEST_F(JobManagerTest, JobDeduplication) {
Expand Down

0 comments on commit de57a13

Please sign in to comment.