Skip to content

Commit

Permalink
refactor job manager
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Mar 3, 2022
1 parent 3822d76 commit 0fa8935
Show file tree
Hide file tree
Showing 25 changed files with 107 additions and 137 deletions.
4 changes: 3 additions & 1 deletion src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AdminJobValidator final : public Validator {
switch (sentence_->getOp()) {
case meta::cpp2::AdminJobOp::ADD:
switch (sentence_->getCmd()) {
// All jobs are space-level, except for the jobs that need to be refactored.
case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX:
case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX:
case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
Expand All @@ -40,7 +41,8 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminCmd::LEADER_BALANCE:
case meta::cpp2::AdminCmd::ZONE_BALANCE:
return true;
// TODO: Also space related, but not available in CreateJobExecutor now.
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
case meta::cpp2::AdminCmd::UNKNOWN:
Expand Down
3 changes: 1 addition & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,11 @@ struct ListClusterInfoReq {
}

struct AddTaskRequest {
// rebuild index / flush / compact / statis
// Task distributed to storage to execute, e.g. flush, compact, stats, etc.
1: meta.AdminCmd cmd
2: i32 job_id
3: i32 task_id
4: TaskPara para
5: optional i32 concurrency
}

struct AddTaskResp {
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,7 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
GraphSpaceID spaceId,
const HostAddr& host,
const std::vector<std::string>& taskSpecificParas,
std::vector<PartitionID> parts,
int concurrency) {
std::vector<PartitionID> parts) {
folly::Promise<StatusOr<bool>> pro;
auto f = pro.getFuture();
auto adminAddr = Utils::getAdminAddrFromStoreAddr(host);
Expand All @@ -727,7 +726,6 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
req.cmd_ref() = cmd;
req.job_id_ref() = jobId;
req.task_id_ref() = taskId;
req.concurrency_ref() = concurrency;

storage::cpp2::TaskPara para;
para.space_id_ref() = spaceId;
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class AdminClient {
* @param host Target host to add task
* @param taskSpecficParas
* @param parts
* @param concurrency
* @return folly::Future<StatusOr<bool>> Return true if succeed, else return an error status
*/
virtual folly::Future<StatusOr<bool>> addTask(cpp2::AdminCmd cmd,
Expand All @@ -213,8 +212,7 @@ class AdminClient {
GraphSpaceID spaceId,
const HostAddr& host,
const std::vector<std::string>& taskSpecficParas,
std::vector<PartitionID> parts,
int concurrency);
std::vector<PartitionID> parts);

/**
* @brief Stop stoarge admin task in given storage host
Expand Down
93 changes: 53 additions & 40 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
#include "common/base/StatusOr.h"
#include "common/stats/StatsManager.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {

void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
cpp2::AdminJobResult result;
auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED;
std::stringstream oss;
oss << "op = " << apache::thrift::util::enumNameSafe(req.get_op());
if (req.get_op() == nebula::meta::cpp2::AdminJobOp::ADD) {
Expand All @@ -27,45 +24,17 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
}
LOG(INFO) << __func__ << "() " << oss.str();

JobManager* jobMgr = JobManager::getInstance();
cpp2::AdminJobResult result;
auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED;
jobMgr_ = JobManager::getInstance();

switch (req.get_op()) {
case nebula::meta::cpp2::AdminJobOp::ADD: {
auto cmd = req.get_cmd();
auto paras = req.get_paras();
if (cmd == cpp2::AdminCmd::REBUILD_TAG_INDEX || cmd == cpp2::AdminCmd::REBUILD_EDGE_INDEX ||
cmd == cpp2::AdminCmd::STATS) {
if (paras.empty()) {
LOG(INFO) << "Parameter should be not empty";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
}

JobID jId = 0;
auto jobExist = jobMgr->checkJobExist(cmd, paras, jId);
if (jobExist) {
LOG(INFO) << "Job has already exists: " << jId;
result.job_id_ref() = jId;
break;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto jobId = autoIncrementId();
// check if Job not exists
if (!nebula::ok(jobId)) {
errorCode = nebula::error(jobId);
break;
}

JobDescription jobDesc(nebula::value(jobId), cmd, paras);
errorCode = jobMgr->addJob(jobDesc, adminClient_);
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
errorCode = addJobProcess(req, result);
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW_All: {
auto ret = jobMgr->showJobs(req.get_paras().back());
auto ret = jobMgr_->showJobs(req.get_paras().back());
if (nebula::ok(ret)) {
result.job_desc_ref() = nebula::value(ret);
} else {
Expand All @@ -87,7 +56,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
auto ret = jobMgr->showJob(iJob, req.get_paras().back());
auto ret = jobMgr_->showJob(iJob, req.get_paras().back());
if (nebula::ok(ret)) {
result.job_desc_ref() = std::vector<cpp2::JobDesc>{nebula::value(ret).first};
result.task_desc_ref() = nebula::value(ret).second;
Expand All @@ -109,7 +78,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
errorCode = jobMgr->stopJob(iJob, req.get_paras().back());
errorCode = jobMgr_->stopJob(iJob, req.get_paras().back());
break;
}
case nebula::meta::cpp2::AdminJobOp::RECOVER: {
Expand All @@ -120,7 +89,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
for (size_t i = 0; i < paras.size() - 1; i++) {
jobIds.push_back(std::stoi(paras[i]));
}
auto ret = jobMgr->recoverJob(spaceName, adminClient_, jobIds);
auto ret = jobMgr_->recoverJob(spaceName, adminClient_, jobIds);
if (nebula::ok(ret)) {
result.recovered_job_num_ref() = nebula::value(ret);
} else {
Expand All @@ -142,5 +111,49 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
onFinished();
}

nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq& req,
cpp2::AdminJobResult& result) {
auto cmd = req.get_cmd();
auto paras = req.get_paras();

// All jobs here are the space level, so the last parameter is spaceName.
if (paras.empty()) {
LOG(INFO) << "Parameter should be not empty";
return nebula::cpp2::ErrorCode::E_INVALID_PARM;
}

// Check if space not exists
auto spaceName = paras.back();
auto spaceRet = getSpaceId(spaceName);
if (!nebula::ok(spaceRet)) {
auto retCode = nebula::error(spaceRet);
LOG(INFO) << "Get space failed, space name: " << spaceName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

// Check if job not exists
JobID jId = 0;
auto jobExist = jobMgr_->checkJobExist(cmd, paras, jId);
if (jobExist) {
LOG(INFO) << "Job has already exists: " << jId;
result.job_id_ref() = jId;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto jobId = autoIncrementId();
if (!nebula::ok(jobId)) {
return nebula::error(jobId);
}

JobDescription jobDesc(nebula::value(jobId), cmd, paras);
auto errorCode = jobMgr_->addJob(jobDesc, adminClient_);
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
return errorCode;
}

} // namespace meta
} // namespace nebula
12 changes: 12 additions & 0 deletions src/meta/processors/job/AdminJobProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/stats/StatsManager.h"
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {
Expand All @@ -28,8 +29,19 @@ class AdminJobProcessor : public BaseProcessor<cpp2::AdminJobResp> {
AdminJobProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient)
: BaseProcessor<cpp2::AdminJobResp>(kvstore), adminClient_(adminClient) {}

private:
/**
* @brief Check whether the parameters are legal, then construct the job and join the queue.
*
* @param req
* @param result
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode addJobProcess(const cpp2::AdminJobReq& req, cpp2::AdminJobResult& result);

protected:
AdminClient* adminClient_{nullptr};
JobManager* jobMgr_{nullptr};
};

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/CompactJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ folly::Future<Status> CompactJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/FlushJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ folly::Future<Status> FlushJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ void JobManager::scheduleThread() {
}
}

// @return: true if all task dispatched, else false
bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
std::unique_ptr<JobExecutor> je =
Expand Down Expand Up @@ -285,7 +284,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_);

if (!jobExec) {
LOG(INFO) << folly::sformat("createMetaJobExecutor failed(), jobId={}", jobId);
LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId);
return nebula::cpp2::ErrorCode::E_TASK_REPORT_OUT_DATE;
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,15 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
JobManager() = default;

void scheduleThread();
void scheduleThreadOld();

/**
* @brief Dispatch all tasks of one job
*
* @param jobDesc
* @param op
* @return true if all task dispatched, else false.
*/
bool runJobInternal(const JobDescription& jobDesc, JbOp op);
bool runJobInternalOld(const JobDescription& jobDesc);

ErrorOr<nebula::cpp2::ErrorCode, GraphSpaceID> getSpaceId(const std::string& name);

Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class MetaJobExecutor : public JobExecutor {
AdminClient* adminClient_{nullptr};
GraphSpaceID space_;
std::vector<std::string> paras_;
int32_t concurrency_{INT_MAX};
volatile bool stopped_{false};
std::mutex muInterrupt_;
std::condition_variable condInterrupt_;
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildEdgeJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildEdgeJobExecutor::executeInternal(HostAddr&& address
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildFTJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildFTJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildTagJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildTagJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
5 changes: 1 addition & 4 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SimpleConcurrentJobExecutor::SimpleConcurrentJobExecutor(JobID jobId,

bool SimpleConcurrentJobExecutor::check() {
auto parasNum = paras_.size();
return parasNum == 1 || parasNum == 2;
return parasNum == 1;
}

nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
Expand All @@ -37,9 +37,6 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
return nebula::error(errOrHost);
}

if (paras_.size() > 1) {
concurrency_ = std::atoi(paras_[0].c_str());
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down
8 changes: 4 additions & 4 deletions src/meta/processors/job/StatsJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ nebula::cpp2::ErrorCode StatsJobExecutor::doRemove(const std::string& key) {
}

nebula::cpp2::ErrorCode StatsJobExecutor::prepare() {
auto spaceRet = getSpaceIdFromName(paras_[0]);
std::string spaceName = paras_.back();
auto spaceRet = getSpaceIdFromName(spaceName);
if (!nebula::ok(spaceRet)) {
LOG(INFO) << "Can't find the space: " << paras_[0];
LOG(INFO) << "Can't find the space: " << spaceName;
return nebula::error(spaceRet);
}
space_ = nebula::value(spaceRet);
Expand All @@ -68,8 +69,7 @@ folly::Future<Status> StatsJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/StorageJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class StorageJobExecutor : public JobExecutor {
GraphSpaceID space_;
std::vector<std::string> paras_;
TargetHosts toHost_{TargetHosts::DEFAULT};
int32_t concurrency_{INT_MAX};
volatile bool stopped_{false};
std::mutex muInterrupt_;
std::condition_variable condInterrupt_;
Expand Down
Loading

0 comments on commit 0fa8935

Please sign in to comment.