Skip to content

Commit

Permalink
download and ingest job
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Mar 18, 2022
1 parent 372f6b4 commit 0dcead6
Show file tree
Hide file tree
Showing 86 changed files with 754 additions and 2,134 deletions.
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class MetaClient {
listener_ = nullptr;
}

folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::JobOp op,
cpp2::JobType type,
folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(meta::cpp2::JobOp op,
meta::cpp2::JobType type,
std::vector<std::string> paras);

// Operations for parts
Expand Down
4 changes: 1 addition & 3 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) {
pool->start(FLAGS_meta_http_thread_num, "http thread pool");

auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return EXIT_FAILURE;
Expand Down
17 changes: 1 addition & 16 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -160,22 +158,9 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return kvstore;
}

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool) {
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore, helper, pool);
return handler;
});
router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore, pool);
return handler;
});
router.get("/replace").handler([kvstore](PathParams&&) {
auto handler = new nebula::meta::MetaHttpReplaceHostHandler();
handler->init(kvstore);
Expand Down
5 changes: 1 addition & 4 deletions src/daemons/MetaDaemonInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId();
std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> peers,
nebula::HostAddr localhost);

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool);
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore);
#endif
2 changes: 0 additions & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ nebula_add_library(
admin/PartExecutor.cpp
admin/CharsetExecutor.cpp
admin/ShowStatsExecutor.cpp
admin/DownloadExecutor.cpp
admin/IngestExecutor.cpp
admin/ConfigExecutor.cpp
admin/ZoneExecutor.cpp
admin/ShowServiceClientsExecutor.cpp
Expand Down
8 changes: 0 additions & 8 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
#include "graph/executor/admin/ConfigExecutor.h"
#include "graph/executor/admin/CreateUserExecutor.h"
#include "graph/executor/admin/DescribeUserExecutor.h"
#include "graph/executor/admin/DownloadExecutor.h"
#include "graph/executor/admin/DropHostsExecutor.h"
#include "graph/executor/admin/DropUserExecutor.h"
#include "graph/executor/admin/GrantRoleExecutor.h"
#include "graph/executor/admin/IngestExecutor.h"
#include "graph/executor/admin/KillQueryExecutor.h"
#include "graph/executor/admin/ListRolesExecutor.h"
#include "graph/executor/admin/ListUserRolesExecutor.h"
Expand Down Expand Up @@ -517,12 +515,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kSignOutService: {
return pool->add(new SignOutServiceExecutor(node, qctx));
}
case PlanNode::Kind::kDownload: {
return pool->add(new DownloadExecutor(node, qctx));
}
case PlanNode::Kind::kIngest: {
return pool->add(new IngestExecutor(node, qctx));
}
case PlanNode::Kind::kShowSessions: {
return pool->add(new ShowSessionsExecutor(node, qctx));
}
Expand Down
33 changes: 0 additions & 33 deletions src/graph/executor/admin/DownloadExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/DownloadExecutor.h

This file was deleted.

28 changes: 0 additions & 28 deletions src/graph/executor/admin/IngestExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/IngestExecutor.h

This file was deleted.

49 changes: 0 additions & 49 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode {
: SingleDependencyNode(qctx, Kind::kShowListener, input) {}
};

class Download final : public SingleDependencyNode {
public:
static Download* make(QueryContext* qctx,
PlanNode* input,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath) {
return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath));
}

const std::string& getHdfsHost() const {
return hdfsHost_;
}

int32_t getHdfsPort() const {
return hdfsPort_;
}

const std::string& getHdfsPath() const {
return hdfsPath_;
}

private:
Download(QueryContext* qctx,
PlanNode* dep,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath)
: SingleDependencyNode(qctx, Kind::kDownload, dep),
hdfsHost_(hdfsHost),
hdfsPort_(hdfsPort),
hdfsPath_(hdfsPath) {}

private:
std::string hdfsHost_;
int32_t hdfsPort_;
std::string hdfsPath_;
};

class Ingest final : public SingleDependencyNode {
public:
static Ingest* make(QueryContext* qctx, PlanNode* dep) {
return qctx->objPool()->add(new Ingest(qctx, dep));
}

private:
Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {}
};

// User related Node
class CreateUser final : public CreateNode {
public:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "SignInService";
case Kind::kSignOutService:
return "SignOutService";
case Kind::kDownload:
return "Download";
case Kind::kIngest:
return "Ingest";
case Kind::kShowSessions:
return "ShowSessions";
case Kind::kUpdateSession:
Expand Down
2 changes: 0 additions & 2 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ class PlanNode {
kShowFTIndexes,
kSignInService,
kSignOutService,
kDownload,
kIngest,
kShowSessions,
kUpdateSession,

Expand Down
4 changes: 1 addition & 3 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace graph {
/**
* Read space : kUse, kDescribeSpace
* Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot,
* kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload
* kDropSnapshot, kBalance, kAdmin, kConfig
* Read schema : kDescribeTag, kDescribeEdge,
* kDescribeTagIndex, kDescribeEdgeIndex
* Write schema : kCreateTag, kAlterTag, kCreateEdge,
Expand Down Expand Up @@ -68,8 +68,6 @@ namespace graph {
case Sentence::Kind::kShowConfigs:
case Sentence::Kind::kSetConfig:
case Sentence::Kind::kGetConfig:
case Sentence::Kind::kIngest:
case Sentence::Kind::kDownload:
case Sentence::Kind::kSignOutService:
case Sentence::Kind::kSignInService: {
return PermissionManager::canWriteSpace(session);
Expand Down
6 changes: 2 additions & 4 deletions src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::JobType::STATS:
case meta::cpp2::JobType::COMPACT:
case meta::cpp2::JobType::FLUSH:
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::DATA_BALANCE:
case meta::cpp2::JobType::LEADER_BALANCE:
case meta::cpp2::JobType::ZONE_BALANCE:
return true;
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::UNKNOWN:
return false;
}
Expand Down
2 changes: 0 additions & 2 deletions src/graph/validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ nebula_add_library(
FindPathValidator.cpp
LookupValidator.cpp
MatchValidator.cpp
DownloadValidator.cpp
IngestValidator.cpp
)

nebula_add_subdirectory(test)
30 changes: 0 additions & 30 deletions src/graph/validator/DownloadValidator.cpp

This file was deleted.

Loading

0 comments on commit 0dcead6

Please sign in to comment.