Skip to content

Commit

Permalink
make 'stop job' work as expected (#4460)
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 authored Aug 1, 2022
1 parent d456abd commit 21ee515
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 42 deletions.
5 changes: 0 additions & 5 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::prepare() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode BalanceJobExecutor::stop() {
stopped_ = true;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() {
if (kvstore_ == nullptr) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
7 changes: 0 additions & 7 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ class BalanceJobExecutor : public MetaJobExecutor {
*/
nebula::cpp2::ErrorCode prepare() override;

/**
* @brief Stop this job
*
* @return
*/
nebula::cpp2::ErrorCode stop() override;

/**
* @brief Finish this job
*
Expand Down
4 changes: 0 additions & 4 deletions src/meta/processors/job/DownloadJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,5 @@ folly::Future<Status> DownloadJobExecutor::executeInternal(HostAddr&& address,
return f;
}

nebula::cpp2::ErrorCode DownloadJobExecutor::stop() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace meta
} // namespace nebula
2 changes: 0 additions & 2 deletions src/meta/processors/job/DownloadJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class DownloadJobExecutor : public SimpleConcurrentJobExecutor {

nebula::cpp2::ErrorCode prepare() override;

nebula::cpp2::ErrorCode stop() override;

protected:
folly::Future<Status> executeInternal(HostAddr&& address,
std::vector<PartitionID>&& parts) override;
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace meta {

class JobDescription {
using Status = cpp2::JobStatus;
FRIEND_TEST(JobManagerTest, StopJob);

public:
JobDescription() = default;
Expand Down
20 changes: 13 additions & 7 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,24 +307,30 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(
}

auto it = runningJobs_.find(jobId);
// Job has not started yet
if (it == runningJobs_.end()) {
// the job has not started yet
// TODO job not existing in runningJobs_ also means leader changed, we handle it later
cleanJob(jobId);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
// Job has been started
auto jobExec = it->second.get();
if (jobStatus == cpp2::JobStatus::STOPPED) {
jobExec->stop();
if (!jobExec->isMetaJob()) {
cleanJob(jobId);
auto code = jobExec->stop();
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
// meta job is trigger by metad, which runs in async. So we can't clean the job executor here.
// The cleanJob will be called in the callback of job executor set by setFinishCallBack.
if (!jobExec->isMetaJob()) {
cleanJob(jobId);
}
}
return code;
} else {
jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED);
// If the job is failed or finished, clean and call finish. We clean the job at first, no
// matter `finish` return SUCCEEDED or not. Because the job has already come to the end.
cleanJob(jobId);
return jobExec->finish(jobStatus == cpp2::JobStatus::FINISHED);
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob);
FRIEND_TEST(JobManagerTest, DownloadJob);
FRIEND_TEST(JobManagerTest, IngestJob);
FRIEND_TEST(JobManagerTest, StopJob);
FRIEND_TEST(GetStatsTest, StatsJob);
FRIEND_TEST(GetStatsTest, MockSingleMachineTest);
FRIEND_TEST(GetStatsTest, MockMultiMachineTest);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/job/MetaJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ nebula::cpp2::ErrorCode MetaJobExecutor::execute() {

// Stop the job when the user cancel it.
nebula::cpp2::ErrorCode MetaJobExecutor::stop() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
// By default we return not stoppable
return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE;
}

nebula::cpp2::ErrorCode MetaJobExecutor::finish(bool) {
Expand Down
5 changes: 5 additions & 0 deletions src/meta/processors/job/RebuildFTJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class RebuildFTJobExecutor : public RebuildJobExecutor {
toHost_ = TargetHosts::LISTENER;
}

nebula::cpp2::ErrorCode stop() override {
// Unlike rebuild tag/edge idnex, rebuild full text job is not stoppable
return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE;
}

protected:
folly::Future<Status> executeInternal(HostAddr&& address,
std::vector<PartitionID>&& parts) override;
Expand Down
4 changes: 0 additions & 4 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,5 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::stop() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace meta
} // namespace nebula
2 changes: 0 additions & 2 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class SimpleConcurrentJobExecutor : public StorageJobExecutor {
nebula::cpp2::ErrorCode check() override;

nebula::cpp2::ErrorCode prepare() override;

nebula::cpp2::ErrorCode stop() override;
};

} // namespace meta
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/job/StorageJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class StorageJobExecutor : public JobExecutor {
* @return
*/
nebula::cpp2::ErrorCode stop() override {
return nebula::cpp2::ErrorCode::SUCCEEDED;
// By default we return not stoppable
return nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE;
}

nebula::cpp2::ErrorCode finish(bool) override {
Expand Down
127 changes: 118 additions & 9 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class JobManagerTest : public ::testing::Test {
adminClient_ = std::make_unique<NiceMock<MockAdminClient>>();
DefaultValue<folly::Future<Status>>::SetFactory(
[] { return folly::Future<Status>(Status::OK()); });
DefaultValue<folly::Future<StatusOr<bool>>>::SetFactory(
[] { return folly::Future<StatusOr<bool>>(true); });
}

std::unique_ptr<JobManager, std::function<void(JobManager*)>> getJobManager() {
Expand All @@ -77,7 +79,7 @@ class JobManagerTest : public ::testing::Test {

std::unique_ptr<fs::TempDir> rootPath_{nullptr};
std::unique_ptr<kvstore::KVStore> kv_{nullptr};
std::unique_ptr<AdminClient> adminClient_{nullptr};
std::unique_ptr<MockAdminClient> adminClient_{nullptr};
};

TEST_F(JobManagerTest, AddJob) {
Expand Down Expand Up @@ -149,12 +151,11 @@ TEST_F(JobManagerTest, DownloadJob) {
JobID jobId = 11;
JobDescription job(space, jobId, cpp2::JobType::DOWNLOAD, paras);

MockAdminClient adminClient;
EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(ByMove(folly::makeFuture<Status>(Status::OK()))));
EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true))));

auto executor = std::make_unique<DownloadJobExecutor>(
space, job.getJobId(), kv.get(), &adminClient, job.getParas());
space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas());
executor->helper_ = std::make_unique<meta::MockHdfsOKHelper>();

ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED);
Expand All @@ -175,11 +176,11 @@ TEST_F(JobManagerTest, IngestJob) {
JobID jobId = 11;
JobDescription job(space, jobId, cpp2::JobType::INGEST, paras);

MockAdminClient adminClient;
EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(ByMove(folly::makeFuture<Status>(Status::OK()))));
EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true))));

auto executor = std::make_unique<IngestJobExecutor>(
space, job.getJobId(), kv.get(), &adminClient, job.getParas());
space, job.getJobId(), kv.get(), adminClient_.get(), job.getParas());

ASSERT_EQ(executor->check(), nebula::cpp2::ErrorCode::SUCCEEDED);
auto code = executor->prepare();
Expand Down Expand Up @@ -639,6 +640,114 @@ TEST_F(JobManagerTest, RecoverJob) {
}
}

TEST_F(JobManagerTest, NotStoppableJob) {
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
JobID jobId = 1;
HostAddr listener("listener_host", 0);

// Write a listener info into meta, rebuild fulltext will use it
auto initListener = [&] {
std::vector<kvstore::KV> kvs;
kvs.emplace_back(
MetaKeyUtils::listenerKey(spaceId, partId, meta::cpp2::ListenerType::ELASTICSEARCH),
MetaKeyUtils::serializeHostAddr(listener));
folly::Baton<true, std::atomic> baton;
kv_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); });
baton.wait();
};

initListener();
TestUtils::setupHB(kv_.get(), {listener}, cpp2::HostRole::LISTENER, "sha");

std::vector<cpp2::JobType> notStoppableJob{
cpp2::JobType::COMPACT,
cpp2::JobType::FLUSH,
cpp2::JobType::REBUILD_FULLTEXT_INDEX,
// cpp2::JobType::DOWNLOAD, // download need hdfs command, it is unstoppable as well
cpp2::JobType::INGEST,
cpp2::JobType::LEADER_BALANCE};
for (const auto& type : notStoppableJob) {
if (type != cpp2::JobType::LEADER_BALANCE) {
EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(
ByMove(folly::makeFuture<StatusOr<bool>>(true).delayed(std::chrono::seconds(1)))));
} else {
HostLeaderMap dist;
dist[HostAddr("0", 0)][1] = {1, 2, 3, 4, 5};
EXPECT_CALL(*adminClient_, getLeaderDist(_))
.WillOnce(testing::DoAll(SetArgPointee<0>(dist),
Return(ByMove(folly::Future<Status>(Status::OK())))));
}

JobDescription jobDesc(spaceId, jobId, type);
auto code = jobMgr->addJob(jobDesc, adminClient_.get());
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);

// sleep a while to make sure the task has begun
std::this_thread::sleep_for(std::chrono::milliseconds(500));
code = jobMgr->stopJob(spaceId, jobId);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::E_JOB_NOT_STOPPABLE);
jobId++;
}
}

TEST_F(JobManagerTest, StoppableJob) {
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
GraphSpaceID spaceId = 1;
JobID jobId = 1;

// Write leader dist into meta
auto initLeader = [&] {
using AllLeaders = std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>;
auto now = time::WallClock::fastNowInMilliSec();
cpp2::LeaderInfo info;
info.part_id_ref() = 1;
info.term_ref() = 1;
std::vector<kvstore::KV> kvs;
AllLeaders leaderMap{{spaceId, {info}}};
ActiveHostsMan::updateHostInfo(kv_.get(),
HostAddr("localhost", 0),
meta::HostInfo(now, cpp2::HostRole::STORAGE, "sha"),
kvs,
&leaderMap);
folly::Baton<true, std::atomic> baton;
kv_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(kvs), [&](auto) { baton.post(); });
baton.wait();
};

initLeader();
std::vector<cpp2::JobType> stoppableJob{
cpp2::JobType::REBUILD_TAG_INDEX, cpp2::JobType::REBUILD_EDGE_INDEX, cpp2::JobType::STATS,
// balance is stoppable, need to mock part distribution, which has been test in BalancerTest
// cpp2::JobType::DATA_BALANCE,
// cpp2::JobType::ZONE_BALANCE
};
for (const auto& type : stoppableJob) {
if (type != cpp2::JobType::DATA_BALANCE && type != cpp2::JobType::ZONE_BALANCE) {
EXPECT_CALL(*adminClient_, addTask(_, _, _, _, _, _, _))
.WillOnce(Return(
ByMove(folly::makeFuture<StatusOr<bool>>(true).delayed(std::chrono::seconds(1)))));
EXPECT_CALL(*adminClient_, stopTask(_, _, _))
.WillOnce(Return(ByMove(folly::makeFuture<StatusOr<bool>>(true))));
}

JobDescription jobDesc(spaceId, jobId, type);
auto code = jobMgr->addJob(jobDesc, adminClient_.get());
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);

// sleep a while to make sure the task has begun
std::this_thread::sleep_for(std::chrono::milliseconds(500));
code = jobMgr->stopJob(spaceId, jobId);
ASSERT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED);

jobId++;
}
}

TEST_F(JobManagerTest, ConcurrentHashMapTest) {
folly::ConcurrentHashMap<int, std::unique_ptr<std::recursive_mutex>> m;

Expand Down

0 comments on commit 21ee515

Please sign in to comment.