Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate download and ingest into job manager #3994

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "version/Version.h"
#include "webservice/Common.h"

DECLARE_int32(ws_meta_http_port);

DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
Expand Down Expand Up @@ -3530,48 +3528,6 @@ folly::Future<StatusOr<int64_t>> MetaClient::getWorkerId(std::string ipAddr) {
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
hdfsHost.c_str(),
hdfsPort,
hdfsPath.c_str(),
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile dispatch successfully") {
LOG(INFO) << "Download Successfully";
return true;
} else {
LOG(ERROR) << "Download Failed: " << result.value();
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/ingest-dispatch?space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile ingest successfully") {
LOG(INFO) << "Ingest Successfully";
return true;
} else {
LOG(ERROR) << "Ingest Failed";
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<int64_t>> MetaClient::getSegmentId(int64_t length) {
auto req = cpp2::GetSegmentIdReq();
req.length_ref() = length;
Expand Down
7 changes: 0 additions & 7 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,6 @@ class MetaClient : public BaseMetaClient {
nebula::cpp2::ErrorCode taskErrCode,
cpp2::StatsItem* statisticItem);

folly::Future<StatusOr<bool>> download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId);

folly::Future<StatusOr<bool>> ingest(GraphSpaceID spaceId);

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override;
Expand Down
4 changes: 1 addition & 3 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -152,7 +150,7 @@ int main(int argc, char* argv[]) {
pool->start(FLAGS_meta_http_thread_num, "http thread pool");

auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return EXIT_FAILURE;
Expand Down
19 changes: 2 additions & 17 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand All @@ -45,7 +43,7 @@ DEFINE_int32(meta_num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_num_worker_threads, 32, "Number of workers");
DEFINE_string(meta_data_path, "", "Root data path");
DECLARE_string(meta_server_addrs); // use define from grap flags.
DECLARE_int32(ws_meta_http_port);
DEFINE_int32(ws_meta_http_port, 11000, "Port to listen on Meta with HTTP protocol");
#endif

using nebula::web::PathParams;
Expand Down Expand Up @@ -160,22 +158,9 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return kvstore;
}

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool) {
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore, helper, pool);
return handler;
});
router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore, pool);
return handler;
});
router.get("/replace").handler([kvstore](PathParams&&) {
auto handler = new nebula::meta::MetaHttpReplaceHostHandler();
handler->init(kvstore);
Expand Down
5 changes: 1 addition & 4 deletions src/daemons/MetaDaemonInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@ nebula::ClusterID& metaClusterId();
std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> peers,
nebula::HostAddr localhost);

nebula::Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool);
nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore);
#endif
7 changes: 1 addition & 6 deletions src/daemons/StandAloneDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpDownloadHandler.h"
#include "meta/http/MetaHttpIngestHandler.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -209,11 +207,8 @@ int main(int argc, char *argv[]) {
return;
}
LOG(INFO) << "Start http service";
auto helper = std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto pool = std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_http_thread_num, "http thread pool");
auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gMetaKVStore.get(), helper.get(), pool.get());
status = initWebService(webSvc.get(), gMetaKVStore.get());
if (!status.ok()) {
LOG(ERROR) << "Init web service failed: " << status;
return;
Expand Down
2 changes: 0 additions & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ nebula_add_library(
admin/PartExecutor.cpp
admin/CharsetExecutor.cpp
admin/ShowStatsExecutor.cpp
admin/DownloadExecutor.cpp
admin/IngestExecutor.cpp
admin/ConfigExecutor.cpp
admin/ZoneExecutor.cpp
admin/ShowServiceClientsExecutor.cpp
Expand Down
8 changes: 0 additions & 8 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
#include "graph/executor/admin/ConfigExecutor.h"
#include "graph/executor/admin/CreateUserExecutor.h"
#include "graph/executor/admin/DescribeUserExecutor.h"
#include "graph/executor/admin/DownloadExecutor.h"
#include "graph/executor/admin/DropHostsExecutor.h"
#include "graph/executor/admin/DropUserExecutor.h"
#include "graph/executor/admin/GrantRoleExecutor.h"
#include "graph/executor/admin/IngestExecutor.h"
#include "graph/executor/admin/KillQueryExecutor.h"
#include "graph/executor/admin/ListRolesExecutor.h"
#include "graph/executor/admin/ListUserRolesExecutor.h"
Expand Down Expand Up @@ -518,12 +516,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kSignOutService: {
return pool->add(new SignOutServiceExecutor(node, qctx));
}
case PlanNode::Kind::kDownload: {
return pool->add(new DownloadExecutor(node, qctx));
}
case PlanNode::Kind::kIngest: {
return pool->add(new IngestExecutor(node, qctx));
}
case PlanNode::Kind::kShowSessions: {
return pool->add(new ShowSessionsExecutor(node, qctx));
}
Expand Down
33 changes: 0 additions & 33 deletions src/graph/executor/admin/DownloadExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/DownloadExecutor.h

This file was deleted.

28 changes: 0 additions & 28 deletions src/graph/executor/admin/IngestExecutor.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions src/graph/executor/admin/IngestExecutor.h

This file was deleted.

49 changes: 0 additions & 49 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,55 +537,6 @@ class ShowListener final : public SingleDependencyNode {
: SingleDependencyNode(qctx, Kind::kShowListener, input) {}
};

class Download final : public SingleDependencyNode {
public:
static Download* make(QueryContext* qctx,
PlanNode* input,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath) {
return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath));
}

const std::string& getHdfsHost() const {
return hdfsHost_;
}

int32_t getHdfsPort() const {
return hdfsPort_;
}

const std::string& getHdfsPath() const {
return hdfsPath_;
}

private:
Download(QueryContext* qctx,
PlanNode* dep,
std::string hdfsHost,
int32_t hdfsPort,
std::string hdfsPath)
: SingleDependencyNode(qctx, Kind::kDownload, dep),
hdfsHost_(hdfsHost),
hdfsPort_(hdfsPort),
hdfsPath_(hdfsPath) {}

private:
std::string hdfsHost_;
int32_t hdfsPort_;
std::string hdfsPath_;
};

class Ingest final : public SingleDependencyNode {
public:
static Ingest* make(QueryContext* qctx, PlanNode* dep) {
return qctx->objPool()->add(new Ingest(qctx, dep));
}

private:
Ingest(QueryContext* qctx, PlanNode* dep) : SingleDependencyNode(qctx, Kind::kIngest, dep) {}
};

// User related Node
class CreateUser final : public CreateNode {
public:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "SignInService";
case Kind::kSignOutService:
return "SignOutService";
case Kind::kDownload:
return "Download";
case Kind::kIngest:
return "Ingest";
case Kind::kShowSessions:
return "ShowSessions";
case Kind::kUpdateSession:
Expand Down
2 changes: 0 additions & 2 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ class PlanNode {
kShowFTIndexes,
kSignInService,
kSignOutService,
kDownload,
kIngest,
kShowSessions,
kUpdateSession,

Expand Down
Loading