diff --git a/CODE_OF_CONDUCT.md b/Coding_Style_Guide.md similarity index 91% rename from CODE_OF_CONDUCT.md rename to Coding_Style_Guide.md index 502cf796c85..bb2b48be25e 100644 --- a/CODE_OF_CONDUCT.md +++ b/Coding_Style_Guide.md @@ -1,4 +1,4 @@ -# Contributor Covenant Code of Conduct +# Contributor Covenant Coding Style Guide Currently we use clang-format to format the code. We recommend you to configure the clang-format according to the IDE / editor you use. The following link is how to configure vim / emacs / vscode to use clang-format. diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index bba41615d32..43813dab555 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -27,11 +27,6 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode BalanceJobExecutor::stop() { - stopped_ = true; - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { if (kvstore_ == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index e444fff7672..321bc31286e 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -107,13 +107,6 @@ class BalanceJobExecutor : public MetaJobExecutor { */ nebula::cpp2::ErrorCode prepare() override; - /** - * @brief Stop this job - * - * @return - */ - nebula::cpp2::ErrorCode stop() override; - /** * @brief Finish this job * diff --git a/src/meta/processors/job/DownloadJobExecutor.cpp b/src/meta/processors/job/DownloadJobExecutor.cpp index d064d64f543..996c75c5f23 100644 --- a/src/meta/processors/job/DownloadJobExecutor.cpp +++ b/src/meta/processors/job/DownloadJobExecutor.cpp @@ -111,9 +111,5 @@ folly::Future<Status> DownloadJobExecutor::executeInternal(HostAddr&& address, return f; } -nebula::cpp2::ErrorCode DownloadJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/DownloadJobExecutor.h b/src/meta/processors/job/DownloadJobExecutor.h index cc77d551f03..61c984542ba 100644 --- a/src/meta/processors/job/DownloadJobExecutor.h +++ b/src/meta/processors/job/DownloadJobExecutor.h @@ -30,8 +30,6 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor { nebula::cpp2::ErrorCode prepare() override; - nebula::cpp2::ErrorCode stop() override; - protected: folly::Future<Status> executeInternal(HostAddr&& address, std::vector<PartitionID>&& parts) override; diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 16c505d61a6..9be7c29cb85 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -19,6 +19,7 @@ namespace meta { class JobDescription { using Status = cpp2::JobStatus; + FRIEND_TEST(JobManagerTest, StopJob); public: JobDescription() = default; diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index df2f8956212..d28684a908a 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -307,24 +307,30 @@ nebula::cpp2::ErrorCode JobManager::jobFinished( } auto it = runningJobs_.find(jobId); + // Job has not started yet if (it == runningJobs_.end()) { - // the job has not started yet // TODO job not existing in runningJobs_ also means leader changed, we handle it later cleanJob(jobId); return nebula::cpp2::ErrorCode::SUCCEEDED; } + // Job has been started auto jobExec = it->second.get(); if (jobStatus == cpp2::JobStatus::STOPPED) { - jobExec->stop(); - if (!jobExec->isMetaJob()) { - cleanJob(jobId); + auto code = jobExec->stop(); + if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { + // meta job is trigger by metad, which runs in async. So we can't clean the job executor here. + // The cleanJob will be called in the callback of job executor set by setFinishCallBack. + if (!jobExec->isMetaJob()) { + cleanJob(jobId); + } } + return code; } else { - jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); + // If the job is failed or finished, clean and call finish. We clean the job at first, no + // matter `finish` return SUCCEEDED or not. Because the job has already come to the end. cleanJob(jobId); + return jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED); } - - return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 84ca8119949..96994d35340 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -43,6 +43,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); FRIEND_TEST(JobManagerTest, DownloadJob); FRIEND_TEST(JobManagerTest, IngestJob); + FRIEND_TEST(JobManagerTest, StopJob); FRIEND_TEST(GetStatsTest, StatsJob); FRIEND_TEST(GetStatsTest, MockSingleMachineTest); FRIEND_TEST(GetStatsTest, MockMultiMachineTest); diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index fd355f13a74..28c92043588 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -35,7 +35,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::execute() { // Stop the job when the user cancel it. nebula::cpp2::ErrorCode MetaJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode MetaJobExecutor::finish(bool) { diff --git a/src/meta/processors/job/RebuildFTJobExecutor.h b/src/meta/processors/job/RebuildFTJobExecutor.h index b23990a0805..36b3a94b227 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.h +++ b/src/meta/processors/job/RebuildFTJobExecutor.h @@ -22,6 +22,11 @@ class RebuildFTJobExecutor : public RebuildJobExecutor { toHost_ = TargetHosts::LISTENER; } + nebula::cpp2::ErrorCode stop() override { + // Unlike rebuild tag/edge idnex, rebuild full text job is not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; + } + protected: folly::Future<Status> executeInternal(HostAddr&& address, std::vector<PartitionID>&& parts) override; diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp index 1b87aa113e9..c6ab75a2ea4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp @@ -39,9 +39,5 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::stop() { - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.h b/src/meta/processors/job/SimpleConcurrentJobExecutor.h index 9d46533f76b..3ac7aa2b5a4 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.h +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.h @@ -23,8 +23,6 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor { nebula::cpp2::ErrorCode check() override; nebula::cpp2::ErrorCode prepare() override; - - nebula::cpp2::ErrorCode stop() override; }; } // namespace meta diff --git a/src/meta/processors/job/StorageJobExecutor.h b/src/meta/processors/job/StorageJobExecutor.h index 42cd50a475c..483ef6e86d2 100644 --- a/src/meta/processors/job/StorageJobExecutor.h +++ b/src/meta/processors/job/StorageJobExecutor.h @@ -67,7 +67,8 @@ class StorageJobExecutor : public JobExecutor { * @return */ nebula::cpp2::ErrorCode stop() override { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // By default we return not stoppable + return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE; } nebula::cpp2::ErrorCode finish(bool) override { diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 714e1268f2c..ecf1d47d7d7 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -53,6 +53,8 @@ class JobManagerTest : public ::testing::Test { adminClient_ = std::make_unique<NiceMock<MockAdminClient>>(); DefaultValue<folly::Future<Status>>::SetFactory( [] { return folly::Future<Status>(Status::OK()); }); + DefaultValue<folly::Future<StatusOr<bool>>>::SetFactory( + [] { return folly::Future<StatusOr<bool>>(true); }); } std::unique_ptr<JobManager, std::function<void(JobManager*)>> getJobManager() { @@ -77,7 +79,7 @@ class JobManagerTest : public ::testing::Test { std::unique_ptr<fs::TempDir> rootPath_{nullptr}; std::unique_ptr<kvstore::KVStore> kv_{nullptr}; - std::unique_ptr<AdminClient> adminClient_{nullptr}; + std::unique_ptr<MockAdminClient> adminClient_{nullptr}; }; TEST_F(JobManagerTest, AddJob) { @@ -149,12 +151,11 @@ TEST_F(JobManagerTest, DownloadJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::DOWNLOAD, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture<Status>(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true)))); auto executor = std::make_unique<DownloadJobExecutor>( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); executor->helper_ = std::make_unique<meta::MockHdfsOKHelper>(); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); @@ -175,11 +176,11 @@ TEST_F(JobManagerTest, IngestJob) { JobID jobId = 11; JobDescription job(space, jobId, cpp2::JobType::INGEST, paras); - MockAdminClient adminClient; - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) - .WillOnce(Return(ByMove(folly::makeFuture<Status>(Status::OK())))); + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true)))); + auto executor = std::make_unique<IngestJobExecutor>( - space, job.getJobId(), kv.get(), &adminClient, job.getParas()); + space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas()); ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED); auto code = executor->prepare(); @@ -639,6 +640,114 @@ TEST_F(JobManagerTest, RecoverJob) { } } +TEST_F(JobManagerTest, NotStoppableJob) { + std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + PartitionID partId = 1; + JobID jobId = 1; + HostAddr listener("listener_host", 0); + + // Write a listener info into meta, rebuild fulltext will use it + auto initListener = [&] { + std::vector<kvstore::KV> kvs; + kvs.emplace_back( + MetaKeyUtils::listenerKey(spaceId, partId, meta::cpp2::ListenerType::ELASTICSEARCH), + MetaKeyUtils::serializeHostAddr(listener)); + folly::Baton<true, std::atomic> baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initListener(); + TestUtils::setupHB(kv_.get(), {listener}, cpp2::HostRole::LISTENER, "sha"); + + std::vector<cpp2::JobType> notStoppableJob{ + cpp2::JobType::COMPACT, + cpp2::JobType::FLUSH, + cpp2::JobType::REBUILD_FULLTEXT_INDEX, + // cpp2::JobType::DOWNLOAD, // download need hdfs command, it is unstoppable as well + cpp2::JobType::INGEST, + cpp2::JobType::LEADER_BALANCE}; + for (const auto& type : notStoppableJob) { + if (type != cpp2::JobType::LEADER_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture<StatusOr<bool>>(true).delayed(std::chrono::seconds(1))))); + } else { + HostLeaderMap dist; + dist[HostAddr("0", 0)][1] = {1, 2, 3, 4, 5}; + EXPECT_CALL(*adminClient_, getLeaderDist(_)) + .WillOnce(testing::DoAll(SetArgPointee<0>(dist), + Return(ByMove(folly::Future<Status>(Status::OK()))))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE); + jobId++; + } +} + +TEST_F(JobManagerTest, StoppableJob) { + std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager(); + GraphSpaceID spaceId = 1; + JobID jobId = 1; + + // Write leader dist into meta + auto initLeader = [&] { + using AllLeaders = std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>; + auto now = time::WallClock::fastNowInMilliSec(); + cpp2::LeaderInfo info; + info.part_id_ref() = 1; + info.term_ref() = 1; + std::vector<kvstore::KV> kvs; + AllLeaders leaderMap{{spaceId, {info}}}; + ActiveHostsMan::updateHostInfo(kv_.get(), + HostAddr("localhost", 0), + meta::HostInfo(now, cpp2::HostRole::STORAGE, "sha"), + kvs, + &leaderMap); + folly::Baton<true, std::atomic> baton; + kv_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); }); + baton.wait(); + }; + + initLeader(); + std::vector<cpp2::JobType> stoppableJob{ + cpp2::JobType::REBUILD_TAG_INDEX, cpp2::JobType::REBUILD_EDGE_INDEX, cpp2::JobType::STATS, + // balance is stoppable, need to mock part distribution, which has been test in BalancerTest + // cpp2::JobType::DATA_BALANCE, + // cpp2::JobType::ZONE_BALANCE + }; + for (const auto& type : stoppableJob) { + if (type != cpp2::JobType::DATA_BALANCE && type != cpp2::JobType::ZONE_BALANCE) { + EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _)) + .WillOnce(Return( + ByMove(folly::makeFuture<StatusOr<bool>>(true).delayed(std::chrono::seconds(1))))); + EXPECT_CALL(*adminClient_, stopTask(_, _, _)) + .WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true)))); + } + + JobDescription jobDesc(spaceId, jobId, type); + auto code = jobMgr->addJob(jobDesc, adminClient_.get()); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + // sleep a while to make sure the task has begun + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + code = jobMgr->stopJob(spaceId, jobId); + ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); + + jobId++; + } +} + TEST_F(JobManagerTest, ConcurrentHashMapTest) { folly::ConcurrentHashMap<int, std::unique_ptr<std::recursive_mutex>> m;