diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index eb0806989a0..0371034b818 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -28,8 +28,6 @@ #include "version/Version.h" #include "webservice/Common.h" -DECLARE_int32(ws_meta_http_port); - DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval"); DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); @@ -3530,48 +3528,6 @@ folly::Future> MetaClient::getWorkerId(std::string ipAddr) { return future; } -folly::Future> MetaClient::download(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath, - GraphSpaceID spaceId) { - auto url = folly::stringPrintf("http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d", - leader_.host.c_str(), - FLAGS_ws_meta_http_port, - hdfsHost.c_str(), - hdfsPort, - hdfsPath.c_str(), - spaceId); - auto func = [url] { - auto result = http::HttpClient::get(url); - if (result.ok() && result.value() == "SSTFile dispatch successfully") { - LOG(INFO) << "Download Successfully"; - return true; - } else { - LOG(ERROR) << "Download Failed: " << result.value(); - return false; - } - }; - return folly::async(func); -} - -folly::Future> MetaClient::ingest(GraphSpaceID spaceId) { - auto url = folly::stringPrintf("http://%s:%d/ingest-dispatch?space=%d", - leader_.host.c_str(), - FLAGS_ws_meta_http_port, - spaceId); - auto func = [url] { - auto result = http::HttpClient::get(url); - if (result.ok() && result.value() == "SSTFile ingest successfully") { - LOG(INFO) << "Ingest Successfully"; - return true; - } else { - LOG(ERROR) << "Ingest Failed"; - return false; - } - }; - return folly::async(func); -} - folly::Future> MetaClient::getSegmentId(int64_t length) { auto req = cpp2::GetSegmentIdReq(); req.length_ref() = length; diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 43c281b3629..7de27e61f7a 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -633,13 +633,6 @@ class MetaClient : public BaseMetaClient { nebula::cpp2::ErrorCode taskErrCode, cpp2::StatsItem* statisticItem); - folly::Future> download(const std::string& hdfsHost, - int32_t hdfsPort, - const std::string& hdfsPath, - GraphSpaceID spaceId); - - folly::Future> ingest(GraphSpaceID spaceId); - folly::Future> getWorkerId(std::string ipAddr); folly::Future> getSegmentId(int64_t length) override; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index e087beae4a4..2c551b497c8 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -883,33 +883,41 @@ nebula::cpp2::ErrorCode NebulaStore::ingest(GraphSpaceID spaceId) { if (!ok(spaceRet)) { return error(spaceRet); } + LOG(INFO) << "Ingesting space " << spaceId; auto space = nebula::value(spaceRet); + std::vector threads; + nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; for (auto& engine : space->engines_) { - auto parts = engine->allParts(); - for (auto part : parts) { - auto ret = this->engine(spaceId, part); - if (!ok(ret)) { - return error(ret); - } - - auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part); - if (!fs::FileUtils::exist(path)) { - VLOG(1) << path << " not existed while ingesting"; - continue; - } + threads.emplace_back(std::thread([&engine, &code, this, spaceId] { + auto parts = engine->allParts(); + for (auto part : parts) { + auto ret = this->engine(spaceId, part); + if (!ok(ret)) { + code = error(ret); + } else { + auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part); + if (!fs::FileUtils::exist(path)) { + LOG(INFO) << path << " not existed"; + continue; + } - auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); - for (auto file : files) { - VLOG(1) << "Ingesting extra file: " << file; - auto code = engine->ingest(std::vector({file})); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - return code; + auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); + auto result = engine->ingest(std::vector(files)); + if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { + code = result; + } } } - } + })); } - return nebula::cpp2::ErrorCode::SUCCEEDED; + + // Wait for all threads to finish + for (auto& t : threads) { + t.join(); + } + LOG(INFO) << "Space " << spaceId << " ingest done."; + return code; } nebula::cpp2::ErrorCode NebulaStore::setOption(GraphSpaceID spaceId, diff --git a/src/meta/http/test/MetaHttpIngestHandlerTest.cpp b/src/meta/http/test/MetaHttpIngestHandlerTest.cpp deleted file mode 100644 index 083cc4c1c69..00000000000 --- a/src/meta/http/test/MetaHttpIngestHandlerTest.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/http/HttpClient.h" -#include "common/thread/GenericThreadPool.h" -#include "meta/http/MetaHttpIngestHandler.h" -#include "meta/test/TestUtils.h" -#include "storage/http/StorageHttpIngestHandler.h" -#include "webservice/Router.h" -#include "webservice/WebService.h" - -DECLARE_int32(ws_storage_http_port); - -namespace nebula { -namespace meta { - -class MetaHttpIngestHandlerTestEnv : public ::testing::Environment { - public: - void SetUp() override { - FLAGS_ws_ip = "127.0.0.1"; - FLAGS_ws_http_port = 0; - VLOG(1) << "Starting web service..."; - - rootPath_ = std::make_unique("/tmp/MetaHttpIngestHandler.XXXXXX"); - kv_ = MockCluster::initMetaKV(rootPath_->path()); - TestUtils::createSomeHosts(kv_.get()); - TestUtils::assembleSpace(kv_.get(), 1, 1); - pool_ = std::make_unique(); - pool_->start(1); - - webSvc_ = std::make_unique(); - - auto& router = webSvc_->router(); - router.get("/ingest-dispatch").handler([this](nebula::web::PathParams&&) { - auto handler = new meta::MetaHttpIngestHandler(); - handler->init(kv_.get(), pool_.get()); - return handler; - }); - router.get("/ingest").handler([this](nebula::web::PathParams&&) { - auto handler = new storage::StorageHttpIngestHandler(); - handler->init(kv_.get()); - return handler; - }); - auto status = webSvc_->start(); - FLAGS_ws_storage_http_port = FLAGS_ws_http_port; - ASSERT_TRUE(status.ok()) << status; - } - - void TearDown() override { - kv_.reset(); - rootPath_.reset(); - webSvc_.reset(); - pool_->stop(); - VLOG(1) << "Web service stopped"; - } - - private: - std::unique_ptr webSvc_; - std::unique_ptr rootPath_; - std::unique_ptr kv_; - std::unique_ptr pool_; -}; - -TEST(MetaHttpIngestHandlerTest, MetaIngestTest) { - auto path = "/tmp/MetaHttpIngestData.XXXXXX"; - std::unique_ptr externalPath = std::make_unique(path); - auto partPath = folly::stringPrintf("%s/nebula/1/download/1", externalPath->path()); - ASSERT_TRUE(nebula::fs::FileUtils::makeDir(partPath)); - - auto options = rocksdb::Options(); - auto env = rocksdb::EnvOptions(); - rocksdb::SstFileWriter writer{env, options}; - auto sstPath = folly::stringPrintf("%s/data.sst", partPath.c_str()); - auto status = writer.Open(sstPath); - ASSERT_EQ(rocksdb::Status::OK(), status); - - for (auto i = 0; i < 10; i++) { - status = writer.Put(folly::stringPrintf("key_%d", i), folly::stringPrintf("val_%d", i)); - ASSERT_EQ(rocksdb::Status::OK(), status); - } - status = writer.Finish(); - ASSERT_EQ(rocksdb::Status::OK(), status); - - { - auto url = "/ingest-dispatch"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_TRUE(resp.value().empty()); - } - { - auto url = "/ingest-dispatch?space=0"; - auto request = - folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url); - auto resp = http::HttpClient::get(request); - ASSERT_TRUE(resp.ok()); - ASSERT_EQ("SSTFile ingest successfully", resp.value()); - } -} - -} // namespace meta -} // namespace nebula - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - ::testing::AddGlobalTestEnvironment(new nebula::meta::MetaHttpIngestHandlerTestEnv()); - - return RUN_ALL_TESTS(); -}