Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Mar 29, 2022
1 parent 2c8bfe2 commit bfaf948
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 191 deletions.
44 changes: 0 additions & 44 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "version/Version.h"
#include "webservice/Common.h"

DECLARE_int32(ws_meta_http_port);

DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
Expand Down Expand Up @@ -3530,48 +3528,6 @@ folly::Future<StatusOr<int64_t>> MetaClient::getWorkerId(std::string ipAddr) {
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
hdfsHost.c_str(),
hdfsPort,
hdfsPath.c_str(),
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile dispatch successfully") {
LOG(INFO) << "Download Successfully";
return true;
} else {
LOG(ERROR) << "Download Failed: " << result.value();
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
auto url = folly::stringPrintf("http://%s:%d/ingest-dispatch?space=%d",
leader_.host.c_str(),
FLAGS_ws_meta_http_port,
spaceId);
auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile ingest successfully") {
LOG(INFO) << "Ingest Successfully";
return true;
} else {
LOG(ERROR) << "Ingest Failed";
return false;
}
};
return folly::async(func);
}

folly::Future<StatusOr<int64_t>> MetaClient::getSegmentId(int64_t length) {
auto req = cpp2::GetSegmentIdReq();
req.length_ref() = length;
Expand Down
7 changes: 0 additions & 7 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,6 @@ class MetaClient : public BaseMetaClient {
nebula::cpp2::ErrorCode taskErrCode,
cpp2::StatsItem* statisticItem);

folly::Future<StatusOr<bool>> download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
GraphSpaceID spaceId);

folly::Future<StatusOr<bool>> ingest(GraphSpaceID spaceId);

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override;
Expand Down
48 changes: 28 additions & 20 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,33 +883,41 @@ nebula::cpp2::ErrorCode NebulaStore::ingest(GraphSpaceID spaceId) {
if (!ok(spaceRet)) {
return error(spaceRet);
}

LOG(INFO) << "Ingesting space " << spaceId;
auto space = nebula::value(spaceRet);
std::vector<std::thread> threads;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
for (auto& engine : space->engines_) {
auto parts = engine->allParts();
for (auto part : parts) {
auto ret = this->engine(spaceId, part);
if (!ok(ret)) {
return error(ret);
}

auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part);
if (!fs::FileUtils::exist(path)) {
VLOG(1) << path << " not existed while ingesting";
continue;
}
threads.emplace_back(std::thread([&engine, &code, this, spaceId] {
auto parts = engine->allParts();
for (auto part : parts) {
auto ret = this->engine(spaceId, part);
if (!ok(ret)) {
code = error(ret);
} else {
auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part);
if (!fs::FileUtils::exist(path)) {
LOG(INFO) << path << " not existed";
continue;
}

auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst");
for (auto file : files) {
VLOG(1) << "Ingesting extra file: " << file;
auto code = engine->ingest(std::vector<std::string>({file}));
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst");
auto result = engine->ingest(std::vector<std::string>(files));
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
code = result;
}
}
}
}
}));
}
return nebula::cpp2::ErrorCode::SUCCEEDED;

// Wait for all threads to finish
for (auto& t : threads) {
t.join();
}
LOG(INFO) << "Space " << spaceId << " ingest done.";
return code;
}

nebula::cpp2::ErrorCode NebulaStore::setOption(GraphSpaceID spaceId,
Expand Down
120 changes: 0 additions & 120 deletions src/meta/http/test/MetaHttpIngestHandlerTest.cpp

This file was deleted.

0 comments on commit bfaf948

Please sign in to comment.