diff --git a/src/graph/validator/AdminJobValidator.h b/src/graph/validator/AdminJobValidator.h index 1f8ff4e5a70..31a931dda77 100644 --- a/src/graph/validator/AdminJobValidator.h +++ b/src/graph/validator/AdminJobValidator.h @@ -30,6 +30,7 @@ class AdminJobValidator final : public Validator { switch (sentence_->getOp()) { case meta::cpp2::AdminJobOp::ADD: switch (sentence_->getCmd()) { + // All jobs are space-level, except for the jobs that need to be refactored. case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX: case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: @@ -40,7 +41,8 @@ class AdminJobValidator final : public Validator { case meta::cpp2::AdminCmd::LEADER_BALANCE: case meta::cpp2::AdminCmd::ZONE_BALANCE: return true; - // TODO: Also space related, but not available in CreateJobExecutor now. + // TODO: download and ingest need to be refactored to use the rpc protocol. + // Currently they are using their own validator case meta::cpp2::AdminCmd::DOWNLOAD: case meta::cpp2::AdminCmd::INGEST: case meta::cpp2::AdminCmd::UNKNOWN: diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 71ed423b1d6..0d7e31c3019 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -816,12 +816,11 @@ struct ListClusterInfoReq { } struct AddTaskRequest { - // rebuild index / flush / compact / statis + // Task distributed to storage to execute, e.g. flush, compact, stats, etc. 1: meta.AdminCmd cmd 2: i32 job_id 3: i32 task_id 4: TaskPara para - 5: optional i32 concurrency } struct AddTaskResp { diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index ca75b4cdce8..200994a1faf 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -717,8 +717,7 @@ folly::Future<StatusOr<bool>> AdminClient::addTask( GraphSpaceID spaceId, const HostAddr& host, const std::vector<std::string>& taskSpecificParas, - std::vector<PartitionID> parts, - int concurrency) { + std::vector<PartitionID> parts) { folly::Promise<StatusOr<bool>> pro; auto f = pro.getFuture(); auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); @@ -727,7 +726,6 @@ folly::Future<StatusOr<bool>> AdminClient::addTask( req.cmd_ref() = cmd; req.job_id_ref() = jobId; req.task_id_ref() = taskId; - req.concurrency_ref() = concurrency; storage::cpp2::TaskPara para; para.space_id_ref() = spaceId; diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index a1ab32a3c56..a49789c9eee 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -204,7 +204,6 @@ class AdminClient { * @param host Target host to add task * @param taskSpecficParas * @param parts - * @param concurrency * @return folly::Future<StatusOr<bool>> Return true if succeed, else return an error status */ virtual folly::Future<StatusOr<bool>> addTask(cpp2::AdminCmd cmd, @@ -213,8 +212,7 @@ class AdminClient { GraphSpaceID spaceId, const HostAddr& host, const std::vector<std::string>& taskSpecficParas, - std::vector<PartitionID> parts, - int concurrency); + std::vector<PartitionID> parts); /** * @brief Stop stoarge admin task in given storage host diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 6b3bf5cc7b4..13723a171ad 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -8,14 +8,11 @@ #include "common/base/StatusOr.h" #include "common/stats/StatsManager.h" #include "meta/processors/job/JobDescription.h" -#include "meta/processors/job/JobManager.h" namespace nebula { namespace meta { void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { - cpp2::AdminJobResult result; - auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED; std::stringstream oss; oss << "op = " << apache::thrift::util::enumNameSafe(req.get_op()); if (req.get_op() == nebula::meta::cpp2::AdminJobOp::ADD) { @@ -27,45 +24,17 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { } LOG(INFO) << __func__ << "() " << oss.str(); - JobManager* jobMgr = JobManager::getInstance(); + cpp2::AdminJobResult result; + auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED; + jobMgr_ = JobManager::getInstance(); + switch (req.get_op()) { case nebula::meta::cpp2::AdminJobOp::ADD: { - auto cmd = req.get_cmd(); - auto paras = req.get_paras(); - if (cmd == cpp2::AdminCmd::REBUILD_TAG_INDEX || cmd == cpp2::AdminCmd::REBUILD_EDGE_INDEX || - cmd == cpp2::AdminCmd::STATS) { - if (paras.empty()) { - LOG(INFO) << "Parameter should be not empty"; - errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; - break; - } - } - - JobID jId = 0; - auto jobExist = jobMgr->checkJobExist(cmd, paras, jId); - if (jobExist) { - LOG(INFO) << "Job has already exists: " << jId; - result.job_id_ref() = jId; - break; - } - - folly::SharedMutex::WriteHolder holder(LockUtils::lock()); - auto jobId = autoIncrementId(); - // check if Job not exists - if (!nebula::ok(jobId)) { - errorCode = nebula::error(jobId); - break; - } - - JobDescription jobDesc(nebula::value(jobId), cmd, paras); - errorCode = jobMgr->addJob(jobDesc, adminClient_); - if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) { - result.job_id_ref() = nebula::value(jobId); - } + errorCode = addJobProcess(req, result); break; } case nebula::meta::cpp2::AdminJobOp::SHOW_All: { - auto ret = jobMgr->showJobs(req.get_paras().back()); + auto ret = jobMgr_->showJobs(req.get_paras().back()); if (nebula::ok(ret)) { result.job_desc_ref() = nebula::value(ret); } else { @@ -87,7 +56,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } - auto ret = jobMgr->showJob(iJob, req.get_paras().back()); + auto ret = jobMgr_->showJob(iJob, req.get_paras().back()); if (nebula::ok(ret)) { result.job_desc_ref() = std::vector<cpp2::JobDesc>{nebula::value(ret).first}; result.task_desc_ref() = nebula::value(ret).second; @@ -109,7 +78,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } - errorCode = jobMgr->stopJob(iJob, req.get_paras().back()); + errorCode = jobMgr_->stopJob(iJob, req.get_paras().back()); break; } case nebula::meta::cpp2::AdminJobOp::RECOVER: { @@ -120,7 +89,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { for (size_t i = 0; i < paras.size() - 1; i++) { jobIds.push_back(std::stoi(paras[i])); } - auto ret = jobMgr->recoverJob(spaceName, adminClient_, jobIds); + auto ret = jobMgr_->recoverJob(spaceName, adminClient_, jobIds); if (nebula::ok(ret)) { result.recovered_job_num_ref() = nebula::value(ret); } else { @@ -142,5 +111,49 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { onFinished(); } +nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq& req, + cpp2::AdminJobResult& result) { + auto cmd = req.get_cmd(); + auto paras = req.get_paras(); + + // All jobs here are the space level, so the last parameter is spaceName. + if (paras.empty()) { + LOG(INFO) << "Parameter should be not empty"; + return nebula::cpp2::ErrorCode::E_INVALID_PARM; + } + + // Check if space not exists + auto spaceName = paras.back(); + auto spaceRet = getSpaceId(spaceName); + if (!nebula::ok(spaceRet)) { + auto retCode = nebula::error(spaceRet); + LOG(INFO) << "Get space failed, space name: " << spaceName + << " error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + + // Check if job not exists + JobID jId = 0; + auto jobExist = jobMgr_->checkJobExist(cmd, paras, jId); + if (jobExist) { + LOG(INFO) << "Job has already exists: " << jId; + result.job_id_ref() = jId; + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); + auto jobId = autoIncrementId(); + if (!nebula::ok(jobId)) { + return nebula::error(jobId); + } + + JobDescription jobDesc(nebula::value(jobId), cmd, paras); + auto errorCode = jobMgr_->addJob(jobDesc, adminClient_); + if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) { + result.job_id_ref() = nebula::value(jobId); + } + return errorCode; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/job/AdminJobProcessor.h b/src/meta/processors/job/AdminJobProcessor.h index bfb340acb67..09836e127ef 100644 --- a/src/meta/processors/job/AdminJobProcessor.h +++ b/src/meta/processors/job/AdminJobProcessor.h @@ -9,6 +9,7 @@ #include "common/stats/StatsManager.h" #include "meta/processors/BaseProcessor.h" #include "meta/processors/admin/AdminClient.h" +#include "meta/processors/job/JobManager.h" namespace nebula { namespace meta { @@ -28,8 +29,19 @@ class AdminJobProcessor : public BaseProcessor<cpp2::AdminJobResp> { AdminJobProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient) : BaseProcessor<cpp2::AdminJobResp>(kvstore), adminClient_(adminClient) {} + private: + /** + * @brief Check whether the parameters are legal, then construct the job and join the queue. + * + * @param req + * @param result + * @return nebula::cpp2::ErrorCode + */ + nebula::cpp2::ErrorCode addJobProcess(const cpp2::AdminJobReq& req, cpp2::AdminJobResult& result); + protected: AdminClient* adminClient_{nullptr}; + JobManager* jobMgr_{nullptr}; }; } // namespace meta diff --git a/src/meta/processors/job/CompactJobExecutor.cpp b/src/meta/processors/job/CompactJobExecutor.cpp index af34aedd023..28c8ba916dd 100644 --- a/src/meta/processors/job/CompactJobExecutor.cpp +++ b/src/meta/processors/job/CompactJobExecutor.cpp @@ -25,8 +25,7 @@ folly::Future<Status> CompactJobExecutor::executeInternal(HostAddr&& address, space_, std::move(address), {}, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/FlushJobExecutor.cpp b/src/meta/processors/job/FlushJobExecutor.cpp index 3c2afa1c49c..5e10663f1b7 100644 --- a/src/meta/processors/job/FlushJobExecutor.cpp +++ b/src/meta/processors/job/FlushJobExecutor.cpp @@ -25,8 +25,7 @@ folly::Future<Status> FlushJobExecutor::executeInternal(HostAddr&& address, space_, std::move(address), {}, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index ddc86766690..7aed996f81b 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -141,7 +141,6 @@ void JobManager::scheduleThread() { } } -// @return: true if all task dispatched, else false bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { std::lock_guard<std::recursive_mutex> lk(muJobFinished_); std::unique_ptr<JobExecutor> je = @@ -285,7 +284,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_); if (!jobExec) { - LOG(INFO) << folly::sformat("createMetaJobExecutor failed(), jobId={}", jobId); + LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId); return nebula::cpp2::ErrorCode::E_TASK_REPORT_OUT_DATE; } diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index db3b4edf467..16d0ccc61d4 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -196,10 +196,15 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { JobManager() = default; void scheduleThread(); - void scheduleThreadOld(); + /** + * @brief Dispatch all tasks of one job + * + * @param jobDesc + * @param op + * @return true if all task dispatched, else false. + */ bool runJobInternal(const JobDescription& jobDesc, JbOp op); - bool runJobInternalOld(const JobDescription& jobDesc); ErrorOr<nebula::cpp2::ErrorCode, GraphSpaceID> getSpaceId(const std::string& name); diff --git a/src/meta/processors/job/MetaJobExecutor.h b/src/meta/processors/job/MetaJobExecutor.h index 3ccf9678f24..be0b5fa37fe 100644 --- a/src/meta/processors/job/MetaJobExecutor.h +++ b/src/meta/processors/job/MetaJobExecutor.h @@ -80,7 +80,6 @@ class MetaJobExecutor : public JobExecutor { AdminClient* adminClient_{nullptr}; GraphSpaceID space_; std::vector<std::string> paras_; - int32_t concurrency_{INT_MAX}; volatile bool stopped_{false}; std::mutex muInterrupt_; std::condition_variable condInterrupt_; diff --git a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp index 10577694d4c..c37c169f328 100644 --- a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp +++ b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp @@ -19,8 +19,7 @@ folly::Future<Status> RebuildEdgeJobExecutor::executeInternal(HostAddr&& address space_, std::move(address), taskParameters_, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/RebuildFTJobExecutor.cpp b/src/meta/processors/job/RebuildFTJobExecutor.cpp index f92deed1046..13ce778f758 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.cpp +++ b/src/meta/processors/job/RebuildFTJobExecutor.cpp @@ -19,8 +19,7 @@ folly::Future<Status> RebuildFTJobExecutor::executeInternal(HostAddr&& address, space_, std::move(address), taskParameters_, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/RebuildTagJobExecutor.cpp b/src/meta/processors/job/RebuildTagJobExecutor.cpp index b42a99c8660..8a91b8e20a6 100644 --- a/src/meta/processors/job/RebuildTagJobExecutor.cpp +++ b/src/meta/processors/job/RebuildTagJobExecutor.cpp @@ -19,8 +19,7 @@ folly::Future<Status> RebuildTagJobExecutor::executeInternal(HostAddr&& address, space_, std::move(address), taskParameters_, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp index 35c2a4ba897..991c44ff66e 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp @@ -19,7 +19,7 @@ SimpleConcurrentJobExecutor::SimpleConcurrentJobExecutor(JobID jobId, bool SimpleConcurrentJobExecutor::check() { auto parasNum = paras_.size(); - return parasNum == 1 || parasNum == 2; + return parasNum == 1; } nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { @@ -37,9 +37,6 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { return nebula::error(errOrHost); } - if (paras_.size() > 1) { - concurrency_ = std::atoi(paras_[0].c_str()); - } return nebula::cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/meta/processors/job/StatsJobExecutor.cpp b/src/meta/processors/job/StatsJobExecutor.cpp index b533db664cd..d012b462be6 100644 --- a/src/meta/processors/job/StatsJobExecutor.cpp +++ b/src/meta/processors/job/StatsJobExecutor.cpp @@ -42,9 +42,10 @@ nebula::cpp2::ErrorCode StatsJobExecutor::doRemove(const std::string& key) { } nebula::cpp2::ErrorCode StatsJobExecutor::prepare() { - auto spaceRet = getSpaceIdFromName(paras_[0]); + std::string spaceName = paras_.back(); + auto spaceRet = getSpaceIdFromName(spaceName); if (!nebula::ok(spaceRet)) { - LOG(INFO) << "Can't find the space: " << paras_[0]; + LOG(INFO) << "Can't find the space: " << spaceName; return nebula::error(spaceRet); } space_ = nebula::value(spaceRet); @@ -68,8 +69,7 @@ folly::Future<Status> StatsJobExecutor::executeInternal(HostAddr&& address, space_, std::move(address), {}, - std::move(parts), - concurrency_) + std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/StorageJobExecutor.h b/src/meta/processors/job/StorageJobExecutor.h index fdc1964b696..ea47614c6a8 100644 --- a/src/meta/processors/job/StorageJobExecutor.h +++ b/src/meta/processors/job/StorageJobExecutor.h @@ -106,7 +106,6 @@ class StorageJobExecutor : public JobExecutor { GraphSpaceID space_; std::vector<std::string> paras_; TargetHosts toHost_{TargetHosts::DEFAULT}; - int32_t concurrency_{INT_MAX}; volatile bool stopped_{false}; std::mutex muInterrupt_; std::condition_variable condInterrupt_; diff --git a/src/meta/test/GetStatsTest.cpp b/src/meta/test/GetStatsTest.cpp index ef001e7ff7a..34c32b584ed 100644 --- a/src/meta/test/GetStatsTest.cpp +++ b/src/meta/test/GetStatsTest.cpp @@ -383,7 +383,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) { JobCallBack cb1(jobMgr, jobId1, 0, 100); JobCallBack cb2(jobMgr, 2, 0, 200); - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _)) + EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) .Times(2) .WillOnce(testing::InvokeWithoutArgs(cb1)) .WillOnce(testing::InvokeWithoutArgs(cb2)); @@ -502,7 +502,7 @@ TEST_F(GetStatsTest, MockMultiMachineTest) { JobCallBack cb2(jobMgr, jobId, 1, 200); JobCallBack cb3(jobMgr, jobId, 2, 300); - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _)) + EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _)) .Times(3) .WillOnce(testing::InvokeWithoutArgs(cb1)) .WillOnce(testing::InvokeWithoutArgs(cb2)) diff --git a/src/meta/test/MockAdminClient.h b/src/meta/test/MockAdminClient.h index 40d3f1dc085..e7008393165 100644 --- a/src/meta/test/MockAdminClient.h +++ b/src/meta/test/MockAdminClient.h @@ -40,15 +40,14 @@ class MockAdminClient : public AdminClient { folly::Future<StatusOr<bool>>(const std::set<GraphSpaceID>&, storage::cpp2::EngineSignType, const HostAddr&)); - MOCK_METHOD8(addTask, + MOCK_METHOD7(addTask, folly::Future<StatusOr<bool>>(cpp2::AdminCmd, int32_t, int32_t, GraphSpaceID, const HostAddr&, const std::vector<std::string>&, - std::vector<PartitionID>, - int)); + std::vector<PartitionID>)); MOCK_METHOD3(stopTask, folly::Future<StatusOr<bool>>(const HostAddr&, int32_t, int32_t)); }; diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index 3a0aad38402..635f3da88ac 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -242,11 +242,9 @@ std::string AdminJobSentence::toString() const { case meta::cpp2::AdminJobOp::ADD: { switch (cmd_) { case meta::cpp2::AdminCmd::COMPACT: - return paras_.empty() ? "SUBMIT JOB COMPACT" - : folly::stringPrintf("SUBMIT JOB COMPACT %s", paras_[0].c_str()); + return "SUBMIT JOB COMPACT"; case meta::cpp2::AdminCmd::FLUSH: - return paras_.empty() ? "SUBMIT JOB FLUSH" - : folly::stringPrintf("SUBMIT JOB FLUSH %s", paras_[0].c_str()); + return "SUBMIT JOB FLUSH"; case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX: return folly::stringPrintf("REBUILD TAG INDEX %s", folly::join(",", paras_).c_str()); case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: @@ -254,8 +252,7 @@ std::string AdminJobSentence::toString() const { case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: return "REBUILD FULLTEXT INDEX"; case meta::cpp2::AdminCmd::STATS: - return paras_.empty() ? "SUBMIT JOB STATS" - : folly::stringPrintf("SUBMIT JOB STATS %s", paras_[0].c_str()); + return "SUBMIT JOB STATS"; case meta::cpp2::AdminCmd::DOWNLOAD: return paras_.empty() ? "DOWNLOAD HDFS " : folly::stringPrintf("DOWNLOAD HDFS %s", paras_[0].c_str()); diff --git a/src/parser/parser.yy b/src/parser/parser.yy index bac8849b2e9..3f2fd5e8f25 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -336,7 +336,7 @@ static constexpr size_t kCommentLengthLimit = 256; %type <service_client_item> service_client_item %type <service_client_list> service_client_list -%type <intval> legal_integer unary_integer rank port job_concurrency +%type <intval> legal_integer unary_integer rank port %type <strval> comment_prop_assignment comment_prop opt_comment_prop %type <col_property> column_property @@ -3234,28 +3234,19 @@ ingest_sentence ; admin_job_sentence - : KW_SUBMIT KW_JOB KW_COMPACT job_concurrency { + : KW_SUBMIT KW_JOB KW_COMPACT { auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, meta::cpp2::AdminCmd::COMPACT); - if ($4 != 0) { - sentence->addPara(std::to_string($4)); - } $$ = sentence; } - | KW_SUBMIT KW_JOB KW_FLUSH job_concurrency { + | KW_SUBMIT KW_JOB KW_FLUSH { auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, meta::cpp2::AdminCmd::FLUSH); - if ($4 != 0) { - sentence->addPara(std::to_string($4)); - } $$ = sentence; } - | KW_SUBMIT KW_JOB KW_STATS job_concurrency { + | KW_SUBMIT KW_JOB KW_STATS { auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, meta::cpp2::AdminCmd::STATS); - if ($4 != 0) { - sentence->addPara(std::to_string($4)); - } $$ = sentence; } | KW_SHOW KW_JOBS { @@ -3324,15 +3315,6 @@ admin_job_sentence } ; -job_concurrency - : %empty { - $$ = 0; - } - | legal_integer { - $$ = $1; - } - ; - show_queries_sentence : KW_SHOW KW_LOCAL KW_QUERIES { $$ = new ShowQueriesSentence(); diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 7f0422cdf86..e44b1045a73 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -3201,11 +3201,8 @@ TEST_F(ParserTest, JobTest) { ASSERT_EQ(result.value()->toString(), expectedStr); }; checkTest("SUBMIT JOB COMPACT", "SUBMIT JOB COMPACT"); - checkTest("SUBMIT JOB COMPACT 111", "SUBMIT JOB COMPACT 111"); checkTest("SUBMIT JOB FLUSH", "SUBMIT JOB FLUSH"); - checkTest("SUBMIT JOB FLUSH 111", "SUBMIT JOB FLUSH 111"); checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS"); - checkTest("SUBMIT JOB STATS 111", "SUBMIT JOB STATS 111"); checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE"); checkTest( "SUBMIT JOB BALANCE IN ZONE REMOVE 192.168.0.1:50000, 192.168.0.1:50001, 192.168.0.1:50002", diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index 6eab6713d14..e8437d92006 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -65,7 +65,6 @@ struct TaskContext { nebula::storage::cpp2::TaskPara parameters_; TaskPriority pri_{TaskPriority::MID}; CallBack onFinish_; - size_t concurrentReq_{INT_MAX}; }; /** @@ -158,26 +157,6 @@ class AdminTask { return ctx_.parameters_.get_space_id(); } - /** - * @brief Set the Concurrent Request - * - * @param concurrentReq Number of concurrent requests. - */ - virtual void setConcurrentReq(int concurrentReq) { - if (concurrentReq > 0) { - ctx_.concurrentReq_ = concurrentReq; - } - } - - /** - * @brief Get the Concurrent Requests number. - * - * @return size_t Concurrent requests number. - */ - virtual size_t getConcurrentReq() { - return ctx_.concurrentReq_; - } - /** * @brief Get error code. * diff --git a/src/storage/admin/AdminTaskManager.cpp b/src/storage/admin/AdminTaskManager.cpp index 19370c6ab3b..9183f7b592d 100644 --- a/src/storage/admin/AdminTaskManager.cpp +++ b/src/storage/admin/AdminTaskManager.cpp @@ -150,10 +150,7 @@ void AdminTaskManager::addAsyncTask(std::shared_ptr<AdminTask> task) { auto ret = tasks_.insert(handle, task).second; DCHECK(ret); taskQueue_.add(handle); - LOG(INFO) << folly::stringPrintf("enqueue task(%d, %d), con req=%zu", - task->getJobId(), - task->getTaskId(), - task->getConcurrentReq()); + LOG(INFO) << folly::stringPrintf("enqueue task(%d, %d)", task->getJobId(), task->getTaskId()); } nebula::cpp2::ErrorCode AdminTaskManager::cancelJob(JobID jobId) { @@ -234,7 +231,6 @@ void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) { env_->adminStore_->remove(key); } -// schedule void AdminTaskManager::schedule() { std::chrono::milliseconds interval{20}; // 20ms while (!shutdown_.load(std::memory_order_acquire)) { @@ -283,8 +279,7 @@ void AdminTaskManager::schedule() { } auto subTaskConcurrency = - std::min(task->getConcurrentReq(), static_cast<size_t>(FLAGS_max_concurrent_subtasks)); - subTaskConcurrency = std::min(subTaskConcurrency, subTasks.size()); + std::min(static_cast<size_t>(FLAGS_max_concurrent_subtasks), subTasks.size()); task->unFinishedSubTask_ = subTasks.size(); if (0 == subTasks.size()) { diff --git a/src/storage/admin/AdminTaskManager.h b/src/storage/admin/AdminTaskManager.h index b80924defa0..f8503771650 100644 --- a/src/storage/admin/AdminTaskManager.h +++ b/src/storage/admin/AdminTaskManager.h @@ -92,7 +92,12 @@ class AdminTaskManager { meta::MetaClient* metaClient_{nullptr}; private: + /** + * @brief Schedule tasks in the queue + * + */ void schedule(); + void runSubTask(TaskHandle handle); private: