From d456abd36288f044e3f73726f926f685c71261dc Mon Sep 17 00:00:00 2001 From: Zhong TIAN <12692414+tian-zhong@users.noreply.github.com> Date: Fri, 29 Jul 2022 18:37:55 +0800 Subject: [PATCH 1/2] Update CODE_OF_CONDUCT.md (#4468) * Update CODE_OF_CONDUCT.md Code of Conduct typically means how contributor should behave in this community. The current contents here are helpful coding syle guide. It is also best to rename the filename to "CODE_STYLE_GUIDE.md" * Rename CODE_OF_CONDUCT.md to Coding_Style_Guide.md as suggested --- CODE_OF_CONDUCT.md => Coding_Style_Guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename CODE_OF_CONDUCT.md => Coding_Style_Guide.md (91%) diff --git a/CODE_OF_CONDUCT.md b/Coding_Style_Guide.md similarity index 91% rename from CODE_OF_CONDUCT.md rename to Coding_Style_Guide.md index 502cf796c85..bb2b48be25e 100644 --- a/CODE_OF_CONDUCT.md +++ b/Coding_Style_Guide.md @@ -1,4 +1,4 @@ -# Contributor Covenant Code of Conduct +# Contributor Covenant Coding Style Guide Currently we use clang-format to format the code. We recommend you to configure the clang-format according to the IDE / editor you use. The following link is how to configure vim / emacs / vscode to use clang-format. From 21ee515dc370dcf8b9cf64e61f87ccca47757015 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 1 Aug 2022 17:07:49 +0800 Subject: [PATCH 2/2] make 'stop job' work as expected (#4460) --- .../processors/job/BalanceJobExecutor.cpp | 5 - src/meta/processors/job/BalanceJobExecutor.h | 7 - .../processors/job/DownloadJobExecutor.cpp | 4 - src/meta/processors/job/DownloadJobExecutor.h | 2 - src/meta/processors/job/JobDescription.h | 1 + src/meta/processors/job/JobManager.cpp | 20 ++- src/meta/processors/job/JobManager.h | 1 + src/meta/processors/job/MetaJobExecutor.cpp | 3 +- .../processors/job/RebuildFTJobExecutor.h | 5 + .../job/SimpleConcurrentJobExecutor.cpp | 4 - .../job/SimpleConcurrentJobExecutor.h | 2 - src/meta/processors/job/StorageJobExecutor.h | 3 +- src/meta/test/JobManagerTest.cpp | 127 ++++++++++++++++-- 13 files changed, 142 insertions(+), 42 deletions(-) diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index bba41615d32..43813dab555 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -27,11 +27,6 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode BalanceJobExecutor::stop() { - stopped_ = true; - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { if (kvstore_ == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index e444fff7672..321bc31286e 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -107,13 +107,6 @@ class BalanceJobExecutor : public MetaJobExecutor { */ nebula::cpp2::ErrorCode prepare() override; - /** - * @brief Stop this job - * - * @return - */ - nebula::cpp2::ErrorCode stop() override; - /** * @brief Finish this job * diff --git a/src/meta/processors/job/DownloadJobExecutor.cpp b/src/meta/processors/job/DownloadJobExecutor.cpp index d064d64f543..996c75c5f23 100644 --- a/src/meta/processors/job/DownloadJobExecutor.cpp +++ b/src/meta/processors/job/DownloadJobExecutor.cpp @@ -111,9 +111,5 @@ folly::Future DownloadJobExecutor::executeInternal(HostAddr&& address, return f; } -nebula::cpp2::ErrorCode DownloadJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/DownloadJobExecutor.h b/src/meta/processors/job/DownloadJobExecutor.h index cc77d551f03..61c984542ba 100644 --- a/src/meta/processors/job/DownloadJobExecutor.h +++ b/src/meta/processors/job/DownloadJobExecutor.h @@ -30,8 +30,6 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor { nebula::cpp2::ErrorCode prepare() override; - nebula::cpp2::ErrorCode stop() override; - protected: folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 16c505d61a6..9be7c29cb85 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -19,6 +19,7 @@ namespace meta { class JobDescription { using Status = cpp2::JobStatus; + FRIEND_TEST(JobManagerTest, StopJob); public: JobDescription() = default; diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index df2f8956212..d28684a908a 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -307,24 +307,30 @@ nebula::cpp2::ErrorCode JobManager::jobFinished( } auto it = runningJobs_.find(jobId); + // Job has not started yet if (it == runningJobs_.end()) { - // the job has not started yet // TODO job not existing in runningJobs_ also means leader changed, we handle it later cleanJob(jobId); return nebula::cpp2::ErrorCode::SUCCEEDED; } + // Job has been started auto jobExec = it->second.get(); if (jobStatus == cpp2::JobStatus::STOPPED) { - jobExec->stop(); - if (!jobExec->isMetaJob()) { - cleanJob(jobId); + auto code = jobExec->stop(); + if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { + // meta job is trigger by metad, which runs in async. So we can't clean the job executor here. + // The cleanJob will be called in the callback of job executor set by setFinishCallBack. + if (!jobExec->isMetaJob()) { + cleanJob(jobId); + } } + return code; } else { - jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); + // If the job is failed or finished, clean and call finish. We clean the job at first, no + // matter `finish` return SUCCEEDED or not. Because the job has already come to the end. cleanJob(jobId); + return jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); } - - return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 84ca8119949..96994d35340 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -43,6 +43,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); FRIEND_TEST(JobManagerTest, DownloadJob); FRIEND_TEST(JobManagerTest, IngestJob); + FRIEND_TEST(JobManagerTest, StopJob); FRIEND_TEST(GetStatsTest, StatsJob); FRIEND_TEST(GetStatsTest, MockSingleMachineTest); FRIEND_TEST(GetStatsTest, MockMultiMachineTest); diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index fd355f13a74..28c92043588 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -35,7 +35,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::execute() { // Stop the job when the user cancel it. nebula::cpp2::ErrorCode MetaJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode MetaJobExecutor::finish(bool) { diff --git a/src/meta/processors/job/RebuildFTJobExecutor.h b/src/meta/processors/job/RebuildFTJobExecutor.h index b23990a0805..36b3a94b227 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.h +++ b/src/meta/processors/job/RebuildFTJobExecutor.h @@ -22,6 +22,11 @@ class RebuildFTJobExecutor : public RebuildJobExecutor { toHost_ = TargetHosts::LISTENER; } + nebula::cpp2::ErrorCode stop() override { + // Unlike rebuild tag/edge idnex, rebuild full text job is not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; + } + protected: folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp index 1b87aa113e9..c6ab75a2ea4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp @@ -39,9 +39,5 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.h b/src/meta/processors/job/SimpleConcurrentJobExecutor.h index 9d46533f76b..3ac7aa2b5a4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.h +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.h @@ -23,8 +23,6 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor { nebula::cpp2::ErrorCode check() override; nebula::cpp2::ErrorCode prepare() override; - - nebula::cpp2::ErrorCode stop() override; }; } // namespace meta diff --git a/src/meta/processors/job/StorageJobExecutor.h b/src/meta/processors/job/StorageJobExecutor.h index 42cd50a475c..483ef6e86d2 100644 --- a/src/meta/processors/job/StorageJobExecutor.h +++ b/src/meta/processors/job/StorageJobExecutor.h @@ -67,7 +67,8 @@ class StorageJobExecutor : public JobExecutor { * @return */ nebula::cpp2::ErrorCode stop() override { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode finish(bool) override { diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 714e1268f2c..ecf1d47d7d7 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -53,6 +53,8 @@ class JobManagerTest : public ::testing::Test { adminClient_ = std::make_unique>(); DefaultValue>::SetFactory( [] { return folly::Future(Status::OK()); }); + DefaultValue>>::SetFactory( + [] { return folly::Future>(true); }); } std::unique_ptr> getJobManager() { @@ -77,7 +79,7 @@ class JobManagerTest : public ::testing::Test { std::unique_ptr rootPath_{nullptr}; std::unique_ptr kv_{nullptr}; - std::unique_ptr adminClient_{nullptr}; + std::unique_ptr adminClient_{nullptr}; }; TEST_F(JobManagerTest, AddJob) { @@ -149,12 +151,11 @@ TEST_F(JobManagerTest, DownloadJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::DOWNLOAD, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); auto executor = std::make_unique( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); executor->helper_ = std::make_unique(); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); @@ -175,11 +176,11 @@ TEST_F(JobManagerTest, IngestJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::INGEST, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); + auto executor = std::make_unique( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); auto code = executor->prepare(); @@ -639,6 +640,114 @@ TEST_F(JobManagerTest, RecoverJob) { } } +TEST_F(JobManagerTest, NotStoppableJob) { + std::unique_ptr> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + PartitionID partId = 1; + JobID jobId = 1; + HostAddr listener("listener_host", 0); + + // Write a listener info into meta, rebuild fulltext will use it + auto initListener = [&] { + std::vector kvs; + kvs.emplace_back( + MetaKeyUtils::listenerKey(spaceId, partId, meta::cpp2::ListenerType::ELASTICSEARCH), + MetaKeyUtils::serializeHostAddr(listener)); + folly::Baton baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initListener(); + TestUtils::setupHB(kv_.get(), {listener}, cpp2::HostRole::LISTENER, "sha"); + + std::vector notStoppableJob{ + cpp2::JobType::COMPACT, + cpp2::JobType::FLUSH, + cpp2::JobType::REBUILD_FULLTEXT_INDEX, + // cpp2::JobType::DOWNLOAD, // download need hdfs command, it is unstoppable as well + cpp2::JobType::INGEST, + cpp2::JobType::LEADER_BALANCE}; + for (const auto& type : notStoppableJob) { + if (type != cpp2::JobType::LEADER_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture>(true).delayed(std::chrono::seconds(1))))); + } else { + HostLeaderMap dist; + dist[HostAddr("0", 0)][1] = {1, 2, 3, 4, 5}; + EXPECT_CALL(*adminClient_, getLeaderDist(_)) + .WillOnce(testing::DoAll(SetArgPointee<0>(dist), + Return(ByMove(folly::Future(Status::OK()))))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE); + jobId++; + } +} + +TEST_F(JobManagerTest, StoppableJob) { + std::unique_ptr> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + JobID jobId = 1; + + // Write leader dist into meta + auto initLeader = [&] { + using AllLeaders = std::unordered_map>; + auto now = time::WallClock::fastNowInMilliSec(); + cpp2::LeaderInfo info; + info.part_id_ref() = 1; + info.term_ref() = 1; + std::vector kvs; + AllLeaders leaderMap{{spaceId, {info}}}; + ActiveHostsMan::updateHostInfo(kv_.get(), + HostAddr("localhost", 0), + meta::HostInfo(now, cpp2::HostRole::STORAGE, "sha"), + kvs, + &leaderMap); + folly::Baton baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initLeader(); + std::vector stoppableJob{ + cpp2::JobType::REBUILD_TAG_INDEX, cpp2::JobType::REBUILD_EDGE_INDEX, cpp2::JobType::STATS, + // balance is stoppable, need to mock part distribution, which has been test in BalancerTest + // cpp2::JobType::DATA_BALANCE, + // cpp2::JobType::ZONE_BALANCE + }; + for (const auto& type : stoppableJob) { + if (type != cpp2::JobType::DATA_BALANCE && type != cpp2::JobType::ZONE_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture>(true).delayed(std::chrono::seconds(1))))); + EXPECT_CALL(*adminClient_, stopTask(_, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture>(true)))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + jobId++; + } +} + TEST_F(JobManagerTest, ConcurrentHashMapTest) { folly::ConcurrentHashMap> m;