From 21ee515dc370dcf8b9cf64e61f87ccca47757015 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 1 Aug 2022 17:07:49 +0800 Subject: [PATCH] make 'stop job' work as expected (#4460) --- .../processors/job/BalanceJobExecutor.cpp | 5 - src/meta/processors/job/BalanceJobExecutor.h | 7 - .../processors/job/DownloadJobExecutor.cpp | 4 - src/meta/processors/job/DownloadJobExecutor.h | 2 - src/meta/processors/job/JobDescription.h | 1 + src/meta/processors/job/JobManager.cpp | 20 ++- src/meta/processors/job/JobManager.h | 1 + src/meta/processors/job/MetaJobExecutor.cpp | 3 +- .../processors/job/RebuildFTJobExecutor.h | 5 + .../job/SimpleConcurrentJobExecutor.cpp | 4 - .../job/SimpleConcurrentJobExecutor.h | 2 - src/meta/processors/job/StorageJobExecutor.h | 3 +- src/meta/test/JobManagerTest.cpp | 127 ++++++++++++++++-- 13 files changed, 142 insertions(+), 42 deletions(-) diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index bba41615d32..43813dab555 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -27,11 +27,6 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode BalanceJobExecutor::stop() { - stopped_ = true; - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { if (kvstore_ == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index e444fff7672..321bc31286e 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -107,13 +107,6 @@ class BalanceJobExecutor : public MetaJobExecutor { */ nebula::cpp2::ErrorCode prepare() override; - /** - * @brief Stop this job - * - * @return - */ - nebula::cpp2::ErrorCode stop() override; - /** * @brief Finish this job * diff --git a/src/meta/processors/job/DownloadJobExecutor.cpp b/src/meta/processors/job/DownloadJobExecutor.cpp index d064d64f543..996c75c5f23 100644 --- a/src/meta/processors/job/DownloadJobExecutor.cpp +++ b/src/meta/processors/job/DownloadJobExecutor.cpp @@ -111,9 +111,5 @@ folly::Future DownloadJobExecutor::executeInternal(HostAddr&& address, return f; } -nebula::cpp2::ErrorCode DownloadJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/DownloadJobExecutor.h b/src/meta/processors/job/DownloadJobExecutor.h index cc77d551f03..61c984542ba 100644 --- a/src/meta/processors/job/DownloadJobExecutor.h +++ b/src/meta/processors/job/DownloadJobExecutor.h @@ -30,8 +30,6 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor { nebula::cpp2::ErrorCode prepare() override; - nebula::cpp2::ErrorCode stop() override; - protected: folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 16c505d61a6..9be7c29cb85 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -19,6 +19,7 @@ namespace meta { class JobDescription { using Status = cpp2::JobStatus; + FRIEND_TEST(JobManagerTest, StopJob); public: JobDescription() = default; diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index df2f8956212..d28684a908a 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -307,24 +307,30 @@ nebula::cpp2::ErrorCode JobManager::jobFinished( } auto it = runningJobs_.find(jobId); + // Job has not started yet if (it == runningJobs_.end()) { - // the job has not started yet // TODO job not existing in runningJobs_ also means leader changed, we handle it later cleanJob(jobId); return nebula::cpp2::ErrorCode::SUCCEEDED; } + // Job has been started auto jobExec = it->second.get(); if (jobStatus == cpp2::JobStatus::STOPPED) { - jobExec->stop(); - if (!jobExec->isMetaJob()) { - cleanJob(jobId); + auto code = jobExec->stop(); + if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { + // meta job is trigger by metad, which runs in async. So we can't clean the job executor here. + // The cleanJob will be called in the callback of job executor set by setFinishCallBack. + if (!jobExec->isMetaJob()) { + cleanJob(jobId); + } } + return code; } else { - jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); + // If the job is failed or finished, clean and call finish. We clean the job at first, no + // matter `finish` return SUCCEEDED or not. Because the job has already come to the end. cleanJob(jobId); + return jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); } - - return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 84ca8119949..96994d35340 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -43,6 +43,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); FRIEND_TEST(JobManagerTest, DownloadJob); FRIEND_TEST(JobManagerTest, IngestJob); + FRIEND_TEST(JobManagerTest, StopJob); FRIEND_TEST(GetStatsTest, StatsJob); FRIEND_TEST(GetStatsTest, MockSingleMachineTest); FRIEND_TEST(GetStatsTest, MockMultiMachineTest); diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index fd355f13a74..28c92043588 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -35,7 +35,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::execute() { // Stop the job when the user cancel it. nebula::cpp2::ErrorCode MetaJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode MetaJobExecutor::finish(bool) { diff --git a/src/meta/processors/job/RebuildFTJobExecutor.h b/src/meta/processors/job/RebuildFTJobExecutor.h index b23990a0805..36b3a94b227 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.h +++ b/src/meta/processors/job/RebuildFTJobExecutor.h @@ -22,6 +22,11 @@ class RebuildFTJobExecutor : public RebuildJobExecutor { toHost_ = TargetHosts::LISTENER; } + nebula::cpp2::ErrorCode stop() override { + // Unlike rebuild tag/edge idnex, rebuild full text job is not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; + } + protected: folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp index 1b87aa113e9..c6ab75a2ea4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp @@ -39,9 +39,5 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.h b/src/meta/processors/job/SimpleConcurrentJobExecutor.h index 9d46533f76b..3ac7aa2b5a4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.h +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.h @@ -23,8 +23,6 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor { nebula::cpp2::ErrorCode check() override; nebula::cpp2::ErrorCode prepare() override; - - nebula::cpp2::ErrorCode stop() override; }; } // namespace meta diff --git a/src/meta/processors/job/StorageJobExecutor.h b/src/meta/processors/job/StorageJobExecutor.h index 42cd50a475c..483ef6e86d2 100644 --- a/src/meta/processors/job/StorageJobExecutor.h +++ b/src/meta/processors/job/StorageJobExecutor.h @@ -67,7 +67,8 @@ class StorageJobExecutor : public JobExecutor { * @return */ nebula::cpp2::ErrorCode stop() override { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode finish(bool) override { diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 714e1268f2c..ecf1d47d7d7 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -53,6 +53,8 @@ class JobManagerTest : public ::testing::Test { adminClient_ = std::make_unique>(); DefaultValue>::SetFactory( [] { return folly::Future(Status::OK()); }); + DefaultValue>>::SetFactory( + [] { return folly::Future>(true); }); } std::unique_ptr> getJobManager() { @@ -77,7 +79,7 @@ class JobManagerTest : public ::testing::Test { std::unique_ptr rootPath_{nullptr}; std::unique_ptr kv_{nullptr}; - std::unique_ptr adminClient_{nullptr}; + std::unique_ptr adminClient_{nullptr}; }; TEST_F(JobManagerTest, AddJob) { @@ -149,12 +151,11 @@ TEST_F(JobManagerTest, DownloadJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::DOWNLOAD, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); auto executor = std::make_unique( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); executor->helper_ = std::make_unique(); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); @@ -175,11 +176,11 @@ TEST_F(JobManagerTest, IngestJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::INGEST, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); + auto executor = std::make_unique( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); auto code = executor->prepare(); @@ -639,6 +640,114 @@ TEST_F(JobManagerTest, RecoverJob) { } } +TEST_F(JobManagerTest, NotStoppableJob) { + std::unique_ptr> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + PartitionID partId = 1; + JobID jobId = 1; + HostAddr listener("listener_host", 0); + + // Write a listener info into meta, rebuild fulltext will use it + auto initListener = [&] { + std::vector kvs; + kvs.emplace_back( + MetaKeyUtils::listenerKey(spaceId, partId, meta::cpp2::ListenerType::ELASTICSEARCH), + MetaKeyUtils::serializeHostAddr(listener)); + folly::Baton baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initListener(); + TestUtils::setupHB(kv_.get(), {listener}, cpp2::HostRole::LISTENER, "sha"); + + std::vector notStoppableJob{ + cpp2::JobType::COMPACT, + cpp2::JobType::FLUSH, + cpp2::JobType::REBUILD_FULLTEXT_INDEX, + // cpp2::JobType::DOWNLOAD, // download need hdfs command, it is unstoppable as well + cpp2::JobType::INGEST, + cpp2::JobType::LEADER_BALANCE}; + for (const auto& type : notStoppableJob) { + if (type != cpp2::JobType::LEADER_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture>(true).delayed(std::chrono::seconds(1))))); + } else { + HostLeaderMap dist; + dist[HostAddr("0", 0)][1] = {1, 2, 3, 4, 5}; + EXPECT_CALL(*adminClient_, getLeaderDist(_)) + .WillOnce(testing::DoAll(SetArgPointee<0>(dist), + Return(ByMove(folly::Future(Status::OK()))))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE); + jobId++; + } +} + +TEST_F(JobManagerTest, StoppableJob) { + std::unique_ptr> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + JobID jobId = 1; + + // Write leader dist into meta + auto initLeader = [&] { + using AllLeaders = std::unordered_map>; + auto now = time::WallClock::fastNowInMilliSec(); + cpp2::LeaderInfo info; + info.part_id_ref() = 1; + info.term_ref() = 1; + std::vector kvs; + AllLeaders leaderMap{{spaceId, {info}}}; + ActiveHostsMan::updateHostInfo(kv_.get(), + HostAddr("localhost", 0), + meta::HostInfo(now, cpp2::HostRole::STORAGE, "sha"), + kvs, + &leaderMap); + folly::Baton baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initLeader(); + std::vector stoppableJob{ + cpp2::JobType::REBUILD_TAG_INDEX, cpp2::JobType::REBUILD_EDGE_INDEX, cpp2::JobType::STATS, + // balance is stoppable, need to mock part distribution, which has been test in BalancerTest + // cpp2::JobType::DATA_BALANCE, + // cpp2::JobType::ZONE_BALANCE + }; + for (const auto& type : stoppableJob) { + if (type != cpp2::JobType::DATA_BALANCE && type != cpp2::JobType::ZONE_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture>(true).delayed(std::chrono::seconds(1))))); + EXPECT_CALL(*adminClient_, stopTask(_, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + jobId++; + } +} + TEST_F(JobManagerTest, ConcurrentHashMapTest) { folly::ConcurrentHashMap> m;