From 7fc82ccfb5177e780a969cb19adc10f2b30d0cb2 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 28 Jan 2022 15:56:54 +0800 Subject: [PATCH] refactor AdminClient (#3781) * refactor AdminClient * fix test case, unify all getAdminAddrFromStoreAddr into AdminClient * add more test cases * address @panda-sheep's comment --- src/interface/storage.thrift | 44 +- src/meta/ActiveHostsMan.cpp | 15 - src/meta/ActiveHostsMan.h | 3 - src/meta/processors/admin/AdminClient.cpp | 510 +++++++++--------- src/meta/processors/admin/AdminClient.h | 155 ++++-- src/meta/processors/admin/SnapShot.cpp | 12 +- .../processors/job/CompactJobExecutor.cpp | 29 +- src/meta/processors/job/FlushJobExecutor.cpp | 29 +- .../processors/job/RebuildEdgeJobExecutor.cpp | 29 +- .../processors/job/RebuildFTJobExecutor.cpp | 29 +- .../processors/job/RebuildJobExecutor.cpp | 5 +- .../processors/job/RebuildTagJobExecutor.cpp | 29 +- src/meta/processors/job/StatsJobExecutor.cpp | 37 +- src/meta/processors/job/StatsJobExecutor.h | 4 - .../processors/job/StorageJobExecutor.cpp | 5 +- src/meta/test/AdminClientTest.cpp | 123 ++++- src/meta/test/CreateBackupProcessorTest.cpp | 25 +- src/meta/test/GetStatsTest.cpp | 6 +- src/meta/test/MockAdminClient.h | 33 +- src/storage/CMakeLists.txt | 1 + src/storage/StorageAdminServiceHandler.cpp | 13 +- src/storage/StorageAdminServiceHandler.h | 10 +- src/storage/admin/AdminProcessor.h | 25 - src/storage/admin/AdminTask.h | 2 +- src/storage/admin/AdminTaskProcessor.cpp | 17 +- src/storage/admin/AdminTaskProcessor.h | 16 +- .../admin/CreateCheckpointProcessor.cpp | 10 +- src/storage/admin/CreateCheckpointProcessor.h | 14 +- src/storage/admin/DropCheckpointProcessor.cpp | 13 +- src/storage/admin/DropCheckpointProcessor.h | 14 +- src/storage/admin/GetLeaderProcessor.cpp | 33 ++ src/storage/admin/GetLeaderProcessor.h | 41 ++ src/storage/admin/SendBlockSignProcessor.cpp | 10 +- src/storage/admin/SendBlockSignProcessor.h | 14 +- src/storage/admin/StopAdminTaskProcessor.cpp | 14 +- src/storage/admin/StopAdminTaskProcessor.h | 18 +- src/storage/test/CheckpointTest.cpp | 2 +- src/storage/test/IndexWithTTLTest.cpp | 8 +- src/storage/test/RebuildIndexTest.cpp | 16 +- src/storage/test/StatsTaskTest.cpp | 6 +- 40 files changed, 861 insertions(+), 558 deletions(-) create mode 100644 src/storage/admin/GetLeaderProcessor.cpp create mode 100644 src/storage/admin/GetLeaderProcessor.h diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 120d187ff86..71ed423b1d6 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -703,7 +703,7 @@ service GraphStorageService { ////////////////////////////////////////////////////////// // Common response for admin methods struct AdminExecResp { - 1: required ResponseCommon result, + 1: required ResponseCommon result, 2: optional meta.StatsItem stats, } @@ -759,27 +759,36 @@ struct CreateCPRequest { 2: binary name, } +struct CreateCPResp { + 1: common.ErrorCode code, + 2: list info, +} struct DropCPRequest { 1: list space_ids, 2: binary name, } +struct DropCPResp { + 1: common.ErrorCode code, +} enum EngineSignType { BLOCK_ON = 1, BLOCK_OFF = 2, } - struct BlockingSignRequest { 1: list space_ids, 2: required EngineSignType sign, } +struct BlockingSignResp { + 1: common.ErrorCode code, +} struct GetLeaderPartsResp { - 1: required ResponseCommon result, + 1: common.ErrorCode code, 2: map> ( cpp.template = "std::unordered_map") leader_parts; } @@ -798,11 +807,6 @@ struct RebuildIndexRequest { 3: common.IndexID index_id, } -struct CreateCPResp { - 1: required ResponseCommon result, - 2: list info, -} - struct ListClusterInfoResp { 1: required ResponseCommon result, 2: common.DirInfo dir, @@ -811,7 +815,7 @@ struct ListClusterInfoResp { struct ListClusterInfoReq { } -struct AddAdminTaskRequest { +struct AddTaskRequest { // rebuild index / flush / compact / statis 1: meta.AdminCmd cmd 2: i32 job_id @@ -820,11 +824,19 @@ struct AddAdminTaskRequest { 5: optional i32 concurrency } -struct StopAdminTaskRequest { +struct AddTaskResp { + 1: common.ErrorCode code, +} + +struct StopTaskRequest { 1: i32 job_id 2: i32 task_id } +struct StopTaskResp { + 1: common.ErrorCode code, +} + service StorageAdminService { // Interfaces for admin operations AdminExecResp transLeader(1: TransLeaderReq req); @@ -836,20 +848,16 @@ service StorageAdminService { // Interfaces for nebula cluster checkpoint CreateCPResp createCheckpoint(1: CreateCPRequest req); - AdminExecResp dropCheckpoint(1: DropCPRequest req); - AdminExecResp blockingWrites(1: BlockingSignRequest req); - - // Interfaces for rebuild index - AdminExecResp rebuildTagIndex(1: RebuildIndexRequest req); - AdminExecResp rebuildEdgeIndex(1: RebuildIndexRequest req); + DropCPResp dropCheckpoint(1: DropCPRequest req); + BlockingSignResp blockingWrites(1: BlockingSignRequest req); // Return all leader partitions on this host GetLeaderPartsResp getLeaderParts(1: GetLeaderReq req); // Return all peers AdminExecResp checkPeers(1: CheckPeersReq req); - AdminExecResp addAdminTask(1: AddAdminTaskRequest req); - AdminExecResp stopAdminTask(1: StopAdminTaskRequest req); + AddTaskResp addAdminTask(1: AddTaskRequest req); + StopTaskResp stopAdminTask(1: StopTaskRequest req); } diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 240dfeb161a..38d51162941 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -226,21 +226,6 @@ ErrorOr> ActiveHostsMan::getActiv return activeHosts; } -ErrorOr> ActiveHostsMan::getActiveAdminHosts( - kvstore::KVStore* kv, int32_t expiredTTL, cpp2::HostRole role) { - auto hostsRet = getActiveHosts(kv, expiredTTL, role); - if (!nebula::ok(hostsRet)) { - return nebula::error(hostsRet); - } - auto hosts = nebula::value(hostsRet); - - std::vector adminHosts(hosts.size()); - std::transform(hosts.begin(), hosts.end(), adminHosts.begin(), [](const auto& h) { - return Utils::getAdminAddrFromStoreAddr(h); - }); - return adminHosts; -} - ErrorOr ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) { auto activeHostsRet = getActiveHosts(kv); diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 3cd5326d874..b79fca44197 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -126,9 +126,6 @@ class ActiveHostsMan final { static ErrorOr> getActiveHostsWithZones( kvstore::KVStore* kv, GraphSpaceID spaceId, int32_t expiredTTL = 0); - static ErrorOr> getActiveAdminHosts( - kvstore::KVStore* kv, int32_t expiredTTL = 0, cpp2::HostRole role = cpp2::HostRole::STORAGE); - static ErrorOr isLived(kvstore::KVStore* kv, const HostAddr& host); static ErrorOr getHostInfo(kvstore::KVStore* kv, diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 54d772877f8..ca75b4cdce8 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -20,30 +20,31 @@ namespace meta { folly::Future AdminClient::transLeader(GraphSpaceID spaceId, PartitionID partId, - const HostAddr& leader, + const HostAddr& src, const HostAddr& dst) { storage::cpp2::TransLeaderReq req; req.space_id_ref() = spaceId; req.part_id_ref() = partId; - auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(ret)); - auto it = std::find(peers.begin(), peers.end(), leader); + auto peers = std::move(nebula::value(partHosts)); + auto it = std::find(peers.begin(), peers.end(), src); if (it == peers.end()) { return Status::PartNotFound(); } - if (peers.size() == 1 && peers.front() == leader) { + if (peers.size() == 1 && peers.front() == src) { // if there is only one replica, skip transfer leader phase return Status::OK(); } auto target = dst; if (dst == kRandomPeer) { for (auto& p : peers) { - if (p != leader) { + if (p != src) { auto retCode = ActiveHostsMan::isLived(kv_, p); if (nebula::ok(retCode) && nebula::value(retCode)) { target = p; @@ -53,8 +54,8 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, } } req.new_leader_ref() = std::move(target); - return getResponse( - Utils::getAdminAddrFromStoreAddr(leader), + return getResponseFromPart( + Utils::getAdminAddrFromStoreAddr(src), std::move(req), [](auto client, auto request) { return client->future_transLeader(request); }, [](auto&& resp) -> Status { @@ -67,7 +68,8 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, return Status::PartNotFound(); } default: - return Status::Error("Unknown code %d", static_cast(resp.get_code())); + return Status::Error("Transfer leader failed: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); } }); } @@ -80,14 +82,15 @@ folly::Future AdminClient::addPart(GraphSpaceID spaceId, req.space_id_ref() = spaceId; req.part_id_ref() = partId; req.as_learner_ref() = asLearner; - auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - req.peers_ref() = std::move(nebula::value(ret)); - return getResponse( + req.peers_ref() = std::move(nebula::value(partHosts)); + return getResponseFromPart( Utils::getAdminAddrFromStoreAddr(host), std::move(req), [](auto client, auto request) { return client->future_addPart(request); }, @@ -95,7 +98,8 @@ folly::Future AdminClient::addPart(GraphSpaceID spaceId, if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::OK(); } else { - return Status::Error("Add part failed! code=%d", static_cast(resp.get_code())); + return Status::Error("Add part failed: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); } }); } @@ -107,16 +111,17 @@ folly::Future AdminClient::addLearner(GraphSpaceID spaceId, req.space_id_ref() = spaceId; req.part_id_ref() = partId; req.learner_ref() = learner; - auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(ret)); + auto peers = std::move(nebula::value(partHosts)); folly::Promise pro; auto f = pro.getFuture(); - getResponse( + getResponseFromLeader( getAdminAddrFromPeers(peers), 0, std::move(req), @@ -134,23 +139,24 @@ folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, req.space_id_ref() = spaceId; req.part_id_ref() = partId; req.target_ref() = target; - auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(ret)); + auto peers = std::move(nebula::value(partHosts)); folly::Promise pro; auto f = pro.getFuture(); - getResponse( + getResponseFromLeader( getAdminAddrFromPeers(peers), 0, std::move(req), [](auto client, auto request) { return client->future_waitingForCatchUpData(request); }, 0, std::move(pro), - 3); + FLAGS_max_retry_times_admin_op); return f; } @@ -163,16 +169,17 @@ folly::Future AdminClient::memberChange(GraphSpaceID spaceId, req.part_id_ref() = partId; req.add_ref() = added; req.peer_ref() = peer; - auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(ret)); + auto peers = std::move(nebula::value(partHosts)); folly::Promise pro; auto f = pro.getFuture(); - getResponse( + getResponseFromLeader( getAdminAddrFromPeers(peers), 0, std::move(req), @@ -189,12 +196,14 @@ folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, const HostAddr& dst) { CHECK_NOTNULL(kv_); auto ret = getPeers(spaceId, partId); - if (!nebula::ok(ret)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(ret)); + auto peers = std::move(nebula::value(partHosts)); auto strHosts = [](const std::vector& hosts) -> std::string { std::stringstream peersStr; for (auto& h : hosts) { @@ -246,7 +255,7 @@ folly::Future AdminClient::removePart(GraphSpaceID spaceId, storage::cpp2::RemovePartReq req; req.space_id_ref() = spaceId; req.part_id_ref() = partId; - return getResponse( + return getResponseFromPart( Utils::getAdminAddrFromStoreAddr(host), std::move(req), [](auto client, auto request) { return client->future_removePart(request); }, @@ -254,8 +263,8 @@ folly::Future AdminClient::removePart(GraphSpaceID spaceId, if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::OK(); } else { - return Status::Error("Remove part failed! code=%d", - static_cast(resp.get_code())); + return Status::Error("Remove part failed, code: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); } }); } @@ -264,13 +273,14 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID storage::cpp2::CheckPeersReq req; req.space_id_ref() = spaceId; req.part_id_ref() = partId; - auto peerRet = getPeers(spaceId, partId); - if (!nebula::ok(peerRet)) { - LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(peerRet)); + auto partHosts = getPeers(spaceId, partId); + if (!nebula::ok(partHosts)) { + LOG(INFO) << "Get peers failed: " + << apache::thrift::util::enumNameSafe(nebula::error(partHosts)); return Status::Error("Get peers failed"); } - auto peers = std::move(nebula::value(peerRet)); + auto peers = std::move(nebula::value(partHosts)); req.peers_ref() = peers; folly::Promise pro; auto fut = pro.getFuture(); @@ -279,8 +289,8 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID auto ret = ActiveHostsMan::isLived(kv_, p); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); - LOG(INFO) << "Get active host failed, error: " << static_cast(retCode); - return Status::Error("Get peers failed"); + LOG(INFO) << "Get active host failed, error: " << apache::thrift::util::enumNameSafe(retCode); + return Status::Error("Get active host failed"); } else { auto isLive = nebula::value(ret); if (!isLive) { @@ -288,7 +298,7 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID continue; } } - auto f = getResponse( + auto f = getResponseFromPart( Utils::getAdminAddrFromStoreAddr(p), req, [](auto client, auto request) { return client->future_checkPeers(request); }, @@ -296,8 +306,8 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::OK(); } else { - return Status::Error("Check peers failed! code=%d", - static_cast(resp.get_code())); + return Status::Error("Check peers failed, code: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); } }); futures.emplace_back(std::move(f)); @@ -311,7 +321,7 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID } else { auto v = std::move(t).value(); for (auto& resp : v) { - // The exception has been catched inside getResponse. + // The exception has been catched inside getResponseFromPart. CHECK(!resp.hasException()); auto st = std::move(resp).value(); if (!st.ok()) { @@ -326,10 +336,10 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID } template -folly::Future AdminClient::getResponse(const HostAddr& host, - Request req, - RemoteFunc remoteFunc, - RespGenerator respGen) { +folly::Future AdminClient::getResponseFromPart(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen) { folly::Promise pro; auto f = pro.getFuture(); auto* evb = ioThreadPool_->getEventBase(); @@ -371,15 +381,58 @@ folly::Future AdminClient::getResponse(const HostAddr& host, return f; } +template +void AdminClient::getResponseFromHost(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen, + folly::Promise> pro) { + auto* evb = ioThreadPool_->getEventBase(); + folly::via( + evb, + [evb, + pro = std::move(pro), + host, + req = std::move(req), + remoteFunc = std::move(remoteFunc), + respGen = std::move(respGen), + this]() mutable { + auto client = clientsMan_->client(host, evb); + remoteFunc(client, std::move(req)) + .via(evb) + .then([p = std::move(pro), respGen = std::move(respGen), host]( + folly::Try&& t) mutable { + // exception occurred during RPC + if (t.hasException()) { + p.setValue(Status::Error(folly::stringPrintf("[%s] RPC failure in AdminClient: %s", + host.toString().c_str(), + t.exception().what().c_str()))); + return; + } + auto&& result = std::move(t).value(); + if (result.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) { + p.setValue( + Status::Error("Operattion failed in AdminClient: %s", + apache::thrift::util::enumNameSafe(result.get_code()).c_str())); + } else { + p.setValue(respGen(std::move(result))); + } + }); + }); +} + template -void AdminClient::getResponse(std::vector hosts, - int32_t index, - Request req, - RemoteFunc remoteFunc, - int32_t retry, - folly::Promise pro, - int32_t retryLimit, - HandleResultOpt respGen) { +void AdminClient::getResponseFromLeader(std::vector hosts, + int32_t index, + Request req, + RemoteFunc remoteFunc, + int32_t retry, + folly::Promise pro, + int32_t retryLimit) { auto* evb = ioThreadPool_->getEventBase(); CHECK_GE(index, 0); CHECK_LT(index, hosts.size()); @@ -389,7 +442,6 @@ void AdminClient::getResponse(std::vector hosts, index, req = std::move(req), remoteFunc = std::move(remoteFunc), - respGen = std::move(respGen), retry, pro = std::move(pro), retryLimit, @@ -402,7 +454,6 @@ void AdminClient::getResponse(std::vector hosts, index, req = std::move(req), remoteFunc = std::move(remoteFunc), - respGen = std::move(respGen), retry, retryLimit, this](folly::Try&& t) mutable { @@ -412,14 +463,13 @@ void AdminClient::getResponse(std::vector hosts, LOG(INFO) << "Rpc failure to " << hosts[index] << ", retry " << retry << ", limit " << retryLimit << ", error: " << t.exception(); index = (index + 1) % hosts.size(); - getResponse(std::move(hosts), - index, - std::move(req), - std::move(remoteFunc), - retry + 1, - std::move(p), - retryLimit, - std::move(respGen)); + getResponseFromLeader(std::move(hosts), + index, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); return; } p.setValue(Status::Error(folly::stringPrintf( @@ -429,16 +479,13 @@ void AdminClient::getResponse(std::vector hosts, auto&& adminResp = std::move(t.value()); if (adminResp.result.get_failed_parts().empty()) { // succeeded - if (respGen != folly::none) { - auto val = respGen.value(); - val(std::move(adminResp)); - } p.setValue(Status::OK()); return; } auto resp = adminResp.result.get_failed_parts().front(); switch (resp.get_code()) { case nebula::cpp2::ErrorCode::E_LEADER_CHANGED: { + // storage will return the address of data service ip:port if (retry < retryLimit) { HostAddr leader("", 0); if (resp.get_leader() != nullptr) { @@ -449,14 +496,13 @@ void AdminClient::getResponse(std::vector hosts, LOG(INFO) << "The leader is in election" << ", retry " << retry << ", limit " << retryLimit; index = (index + 1) % hosts.size(); - getResponse(std::move(hosts), - index, - std::move(req), - std::move(remoteFunc), - retry + 1, - std::move(p), - retryLimit, - std::move(respGen)); + getResponseFromLeader(std::move(hosts), + index, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); return; } // convert to admin addr @@ -481,14 +527,13 @@ void AdminClient::getResponse(std::vector hosts, << "Return leader change from " << hosts[index] << ", new leader is " << leader << ", retry " << retry << ", limit " << retryLimit; CHECK_LT(leaderIndex, hosts.size()); - getResponse(std::move(hosts), - leaderIndex, - std::move(req), - std::move(remoteFunc), - retry + 1, - std::move(p), - retryLimit, - std::move(respGen)); + getResponseFromLeader(std::move(hosts), + leaderIndex, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); return; } p.setValue(Status::Error("Leader changed!")); @@ -500,14 +545,13 @@ void AdminClient::getResponse(std::vector hosts, << " from " << hosts[index] << ", retry " << retry << ", limit " << retryLimit; index = (index + 1) % hosts.size(); - getResponse(std::move(hosts), - index, - std::move(req), - std::move(remoteFunc), - retry + 1, - std::move(p), - retryLimit, - std::move(respGen)); + getResponseFromLeader(std::move(hosts), + index, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); return; } p.setValue(Status::Error("Unknown code %d", @@ -519,6 +563,7 @@ void AdminClient::getResponse(std::vector hosts, }); // via } +// todo(doodle): add related locks ErrorOr> AdminClient::getPeers(GraphSpaceID spaceId, PartitionID partId) { CHECK_NOTNULL(kv_); @@ -540,35 +585,22 @@ std::vector AdminClient::getAdminAddrFromPeers(const std::vector>&& pro, - int32_t retry, - int32_t retryLimit) { - auto* evb = ioThreadPool_->getEventBase(); - folly::via(evb, [evb, host, pro = std::move(pro), retry, retryLimit, this]() mutable { - storage::cpp2::GetLeaderReq req; - auto client = clientsMan_->client(host, evb); - client->future_getLeaderParts(std::move(req)) - .via(evb) - .then([pro = std::move(pro), host, retry, retryLimit, this]( - folly::Try&& t) mutable { - if (t.hasException()) { - LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()); - if (retry < retryLimit) { - usleep(1000 * 50); - getLeaderDist( - Utils::getAdminAddrFromStoreAddr(host), std::move(pro), retry + 1, retryLimit); - } else { - pro.setValue(Status::Error("RPC failure in AdminClient")); - } - return; - } - auto&& resp = std::move(t).value(); - LOG(INFO) << "Get leader for host " << host; - pro.setValue(std::move(resp)); - }); - }); +folly::Future>>> +AdminClient::getLeaderDist(const HostAddr& host) { + folly::Promise>>> pro; + auto f = pro.getFuture(); + + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); + storage::cpp2::GetLeaderReq req; + getResponseFromHost( + adminAddr, + std::move(req), + [](auto client, auto request) { return client->future_getLeaderParts(request); }, + [](storage::cpp2::GetLeaderPartsResp&& resp) -> decltype(auto) { + return resp.get_leader_parts(); + }, + std::move(pro)); + return f; } folly::Future AdminClient::getLeaderDist(HostLeaderMap* result) { @@ -581,33 +613,23 @@ folly::Future AdminClient::getLeaderDist(HostLeaderMap* result) { } auto allHosts = nebula::value(allHostsRet); - std::vector>> hostFutures; + std::vector>>>> + hostFutures; for (const auto& h : allHosts) { - folly::Promise> pro; - auto fut = pro.getFuture(); - getLeaderDist(Utils::getAdminAddrFromStoreAddr(h), std::move(pro), 0, 3); - hostFutures.emplace_back(std::move(fut)); + hostFutures.emplace_back(getLeaderDist(h)); } folly::collectAll(std::move(hostFutures)) .via(ioThreadPool_.get()) .thenValue([p = std::move(promise), result, allHosts](auto&& tries) mutable { - size_t idx = 0; - for (auto& t : tries) { - if (t.hasException()) { - ++idx; + for (size_t i = 0; i < allHosts.size(); i++) { + CHECK(!tries[i].hasException()); + auto hostLeader = std::move(tries[i].value()); + if (!hostLeader.ok()) { continue; } - auto&& status = std::move(t.value()); - if (!status.ok()) { - ++idx; - continue; - } - auto resp = status.value(); - result->emplace(allHosts[idx], std::move(*resp.leader_parts_ref())); - ++idx; + result->emplace(allHosts[i], hostLeader.value()); } - p.setValue(Status::OK()); }) .thenError([p = std::move(promise)](auto&& e) mutable { @@ -622,108 +644,86 @@ folly::Future> AdminClient::createSnapshot( folly::Promise> pro; auto f = pro.getFuture(); - auto* evb = ioThreadPool_->getEventBase(); - auto storageHost = Utils::getAdminAddrFromStoreAddr(host); - folly::via(evb, [evb, storageHost, host, pro = std::move(pro), spaceIds, name, this]() mutable { - auto client = clientsMan_->client(storageHost, evb); - storage::cpp2::CreateCPRequest req; - std::vector idList; - idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); - req.space_ids_ref() = idList; - req.name_ref() = name; - client->future_createCheckpoint(std::move(req)) - .via(evb) - .then([p = std::move(pro), storageHost, host]( - folly::Try&& t) mutable { - if (t.hasException()) { - LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()); - p.setValue(Status::Error("RPC failure in createCheckpoint")); - return; - } - auto&& resp = std::move(t).value(); - auto&& result = resp.get_result(); - if (result.get_failed_parts().empty()) { - cpp2::HostBackupInfo hostBackupInfo; - hostBackupInfo.host_ref() = host; - hostBackupInfo.checkpoints_ref() = std::move(resp.get_info()); - p.setValue(std::move(hostBackupInfo)); - return; - } - p.setValue(Status::Error("create checkpoint failed")); - }); - }); - + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); + storage::cpp2::CreateCPRequest req; + std::vector idList(spaceIds.begin(), spaceIds.end()); + req.space_ids_ref() = idList; + req.name_ref() = name; + getResponseFromHost( + adminAddr, + std::move(req), + [](auto client, auto request) { return client->future_createCheckpoint(request); }, + [host](storage::cpp2::CreateCPResp&& resp) -> StatusOr { + if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { + cpp2::HostBackupInfo hostBackupInfo; + hostBackupInfo.host_ref() = host; + hostBackupInfo.checkpoints_ref() = std::move(resp.get_info()); + return hostBackupInfo; + } else { + return Status::Error("Create snapshot failed: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); + } + }, + std::move(pro)); return f; } -folly::Future AdminClient::dropSnapshot(const std::set& spaceIds, - const std::string& name, - const HostAddr& host) { +folly::Future> AdminClient::dropSnapshot(const std::set& spaceIds, + const std::string& name, + const HostAddr& host) { + folly::Promise> pro; + auto f = pro.getFuture(); + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); + std::vector idList(spaceIds.begin(), spaceIds.end()); storage::cpp2::DropCPRequest req; - std::vector idList; - idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); req.space_ids_ref() = idList; req.name_ref() = name; - folly::Promise pro; - auto f = pro.getFuture(); - getResponse( - {Utils::getAdminAddrFromStoreAddr(host)}, - 0, + getResponseFromHost( + adminAddr, std::move(req), [](auto client, auto request) { return client->future_dropCheckpoint(request); }, - 0, - std::move(pro), - 3 /*The snapshot operation need to retry 3 times*/); + [](storage::cpp2::DropCPResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(pro)); return f; } -folly::Future AdminClient::blockingWrites(const std::set& spaceIds, - storage::cpp2::EngineSignType sign, - const HostAddr& host) { +folly::Future> AdminClient::blockingWrites(const std::set& spaceIds, + storage::cpp2::EngineSignType sign, + const HostAddr& host) { + folly::Promise> pro; + auto f = pro.getFuture(); + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); storage::cpp2::BlockingSignRequest req; - std::vector idList; - idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); + std::vector idList(spaceIds.begin(), spaceIds.end()); req.space_ids_ref() = idList; req.sign_ref() = sign; - folly::Promise pro; - auto f = pro.getFuture(); - getResponse( - {Utils::getAdminAddrFromStoreAddr(host)}, - 0, + getResponseFromHost( + adminAddr, std::move(req), [](auto client, auto request) { return client->future_blockingWrites(request); }, - 0, - std::move(pro), - 32 /*The blocking need to retry 32 times*/); + [](storage::cpp2::BlockingSignResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(pro)); return f; } -folly::Future AdminClient::addTask(cpp2::AdminCmd cmd, - int32_t jobId, - int32_t taskId, - GraphSpaceID spaceId, - const std::vector& targetHost, - const std::vector& taskSpecificParas, - std::vector parts, - int concurrency, - cpp2::StatsItem* statsResult) { - folly::Promise pro; +folly::Future> AdminClient::addTask( + cpp2::AdminCmd cmd, + int32_t jobId, + int32_t taskId, + GraphSpaceID spaceId, + const HostAddr& host, + const std::vector& taskSpecificParas, + std::vector parts, + int concurrency) { + folly::Promise> pro; auto f = pro.getFuture(); - std::vector hosts; - if (targetHost.empty()) { - auto activeHostsRet = ActiveHostsMan::getActiveAdminHosts(kv_); - if (!nebula::ok(activeHostsRet)) { - pro.setValue(Status::Error("Get active hosts failed")); - return f; - } else { - hosts = nebula::value(activeHostsRet); - } - } else { - hosts = targetHost; - } + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); - storage::cpp2::AddAdminTaskRequest req; + storage::cpp2::AddTaskRequest req; req.cmd_ref() = cmd; req.job_id_ref() = jobId; req.task_id_ref() = taskId; @@ -735,55 +735,35 @@ folly::Future AdminClient::addTask(cpp2::AdminCmd cmd, para.task_specific_paras_ref() = taskSpecificParas; req.para_ref() = std::move(para); - std::function respGen = - [statsResult](storage::cpp2::AdminExecResp&& resp) -> void { - if (statsResult && resp.stats_ref().has_value()) { - *statsResult = *resp.stats_ref(); - } - }; - - getResponse( - hosts, - 0, + getResponseFromHost( + adminAddr, std::move(req), [](auto client, auto request) { return client->future_addAdminTask(request); }, - 0, - std::move(pro), - 0, - std::move(respGen)); + [](storage::cpp2::AddTaskResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(pro)); return f; } -folly::Future AdminClient::stopTask(const std::vector& target, - int32_t jobId, - int32_t taskId) { - folly::Promise pro; +folly::Future> AdminClient::stopTask(const HostAddr& host, + int32_t jobId, + int32_t taskId) { + folly::Promise> pro; auto f = pro.getFuture(); - std::vector hosts; - if (target.empty()) { - auto activeHostsRet = ActiveHostsMan::getActiveAdminHosts(kv_); - if (!nebula::ok(activeHostsRet)) { - pro.setValue(Status::Error("Get active hosts failed")); - return f; - } else { - hosts = nebula::value(activeHostsRet); - } - } else { - hosts = target; - } - storage::cpp2::StopAdminTaskRequest req; + auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); + storage::cpp2::StopTaskRequest req; req.job_id_ref() = jobId; req.task_id_ref() = taskId; - - getResponse( - hosts, - 0, + getResponseFromHost( + adminAddr, std::move(req), [](auto client, auto request) { return client->future_stopAdminTask(request); }, - 0, - std::move(pro), - 1); + [](storage::cpp2::StopTaskResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(pro)); return f; } diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index cb3ffbdc982..a1ab32a3c56 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -20,7 +20,6 @@ namespace meta { using HostLeaderMap = std::unordered_map>>; -using HandleResultOpt = folly::Optional>; static const HostAddr kRandomPeer("", 0); @@ -44,7 +43,10 @@ class AdminClient { } /** - * @brief Transfer given partition's leader from now to dst + * @brief If it is a partition with only one replica, return succeed if src is the only host. + * If it is a partition with multiple replicas: + * 1. If src is not the leader, return succeed + * 2. If src is the leader, try to transfer to another peer specified by dst * * @param spaceId * @param partId @@ -54,18 +56,17 @@ class AdminClient { */ virtual folly::Future transLeader(GraphSpaceID spaceId, PartitionID partId, - const HostAddr& leader, + const HostAddr& src, const HostAddr& dst = kRandomPeer); /** - * @brief Add a peer for given partition in specified host. The rpc - * will be sent to the new partition peer, letting it know the - * other peers. + * @brief Open a new partition on specified host. The rpc will be sent to the new partition peer, + * letting it know the other peers. * * @param spaceId * @param partId * @param host - * @param asLearner Add an peer only as raft learner + * @param asLearner Whether the partition start only as raft learner * @return folly::Future */ virtual folly::Future addPart(GraphSpaceID spaceId, @@ -99,8 +100,10 @@ class AdminClient { const HostAddr& target); /** - * @brief Add/Remove one peer for partition (spaceId, partId). - * "added" should be true if we want to add one peer, otherwise it is false. + * @brief Add/Remove one peer for partition (spaceId, partId). The rpc will be sent to the + * partition leader. "added" should be true if we want to add one peer, otherwise it is + * false. + * * @param spaceId * @param partId * @param peer @@ -173,11 +176,11 @@ class AdminClient { * @param spaceIds spaces to drop * @param name snapshot name * @param host storage host - * @return folly::Future + * @return folly::Future> Return true if succeed, else return an error status */ - virtual folly::Future dropSnapshot(const std::set& spaceIds, - const std::string& name, - const HostAddr& host); + virtual folly::Future> dropSnapshot(const std::set& spaceIds, + const std::string& name, + const HostAddr& host); /** * @brief Blocking/Allowing writings to given spaces in specified storage host @@ -185,11 +188,11 @@ class AdminClient { * @param spaceIds * @param sign BLOCK_ON: blocking, BLOCK_OFF: allowing * @param host - * @return folly::Future + * @return folly::Future> Return true if succeed, else return an error status */ - virtual folly::Future blockingWrites(const std::set& spaceIds, - storage::cpp2::EngineSignType sign, - const HostAddr& host); + virtual folly::Future> blockingWrites(const std::set& spaceIds, + storage::cpp2::EngineSignType sign, + const HostAddr& host); /** * @brief Add storage admin task to given storage host @@ -198,56 +201,106 @@ class AdminClient { * @param jobId * @param taskId * @param spaceId - * @param specificHosts if hosts are empty, will send request to all ative storage hosts + * @param host Target host to add task * @param taskSpecficParas * @param parts * @param concurrency - * @param statsResult - * @return folly::Future + * @return folly::Future> Return true if succeed, else return an error status */ - virtual folly::Future addTask(cpp2::AdminCmd cmd, - int32_t jobId, - int32_t taskId, - GraphSpaceID spaceId, - const std::vector& specificHosts, - const std::vector& taskSpecficParas, - std::vector parts, - int concurrency, - cpp2::StatsItem* statsResult = nullptr); + virtual folly::Future> addTask(cpp2::AdminCmd cmd, + int32_t jobId, + int32_t taskId, + GraphSpaceID spaceId, + const HostAddr& host, + const std::vector& taskSpecficParas, + std::vector parts, + int concurrency); /** * @brief Stop stoarge admin task in given storage host * - * @param target if target hosts are emtpy, will send request to all active storage hosts + * @param host Target storage host to stop job * @param jobId * @param taskId - * @return folly::Future + * @return folly::Future> Return true if succeed, else return an error status */ - virtual folly::Future stopTask(const std::vector& target, - int32_t jobId, - int32_t taskId); + virtual folly::Future> stopTask(const HostAddr& host, + int32_t jobId, + int32_t taskId); private: + /** + * @brief Send the rpc request to a storage node, the operation is only related to one partition + * + * @tparam Request RPC request type + * @tparam RemoteFunc Client's RPC function + * @tparam RespGenerator The result generator based on RPC response + * @param host Admin ip:port of storage node + * @param req RPC request + * @param remoteFunc Client's RPC function + * @param respGen The result generator based on RPC response + * @return folly::Future + */ template - folly::Future getResponse(const HostAddr& host, - Request req, - RemoteFunc remoteFunc, - RespGenerator respGen); + folly::Future getResponseFromPart(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen); + + /** + * @brief Send the rpc request to a storage node, the operation is only realted to the spaces, + * does not have affect on a partition. It may also return extra infomations, so return + * StatusOr is necessary + * + * @tparam Request RPC request type + * @tparam RemoteFunc Client's RPC function + * @tparam RespGenerator The result generator based on RPC response + * @tparam RpcResponse RPC response + * @tparam Response The result type + * @param host Admin ip:port of storage node + * @param req RPC request + * @param remoteFunc Client's RPC function + * @param respGen The result generator based on RPC response + * @param pro The promise of result + */ + template , + Request)>::type::value_type, + class Response = typename std::result_of::type> + void getResponseFromHost(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen, + folly::Promise> pro); + /** + * @brief Send the rpc request to a storage node, the operation is only related to one partition + * and **it need to be exuecuted on leader** + * + * @tparam Request RPC request type + * @tparam RemoteFunc Client's RPC function + * @param host Admin ip:port of storage nodes which has the partition + * @param index The index of storage node in `hosts`, the rpc will be sent to the node + * @param req RPC request + * @param remoteFunc Client's RPC function + * @param retry Current retry times + * @param pro Promise of result + * @param retryLimit Max retry times + */ template - void getResponse(std::vector hosts, - int32_t index, - Request req, - RemoteFunc remoteFunc, - int32_t retry, - folly::Promise pro, - int32_t retryLimit, - HandleResultOpt respGen = folly::none); - - void getLeaderDist(const HostAddr& host, - folly::Promise>&& pro, - int32_t retry, - int32_t retryLimit); + void getResponseFromLeader(std::vector hosts, + int32_t index, + Request req, + RemoteFunc remoteFunc, + int32_t retry, + folly::Promise pro, + int32_t retryLimit); + + folly::Future>>> getLeaderDist( + const HostAddr& host); Status handleResponse(const storage::cpp2::AdminExecResp& resp); diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 3bfcfffc701..29c5827f4b5 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -81,11 +81,11 @@ nebula::cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, continue; } - auto status = client_->dropSnapshot(spaces, name, host).get(); - if (!status.ok()) { + auto result = client_->dropSnapshot(spaces, name, host).get(); + if (!result.ok()) { auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; LOG(INFO) << folly::stringPrintf( - msg, name.c_str(), host.toString().c_str(), status.toString().c_str()); + msg, name.c_str(), host.toString().c_str(), result.status().toString().c_str()); } } return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -105,10 +105,10 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; for (const auto& [host, spaces] : hostSpaces) { LOG(INFO) << "will block write host: " << host; - auto status = client_->blockingWrites(spaces, sign, host).get(); - if (!status.ok()) { + auto result = client_->blockingWrites(spaces, sign, host).get(); + if (!result.ok()) { LOG(INFO) << "Send blocking sign error on host " << host - << ", errorcode: " << status.message(); + << ", errorcode: " << result.status().toString(); ret = nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE; if (sign == storage::cpp2::EngineSignType::BLOCK_ON) { break; diff --git a/src/meta/processors/job/CompactJobExecutor.cpp b/src/meta/processors/job/CompactJobExecutor.cpp index 8d5741709ce..af34aedd023 100644 --- a/src/meta/processors/job/CompactJobExecutor.cpp +++ b/src/meta/processors/job/CompactJobExecutor.cpp @@ -16,14 +16,27 @@ CompactJobExecutor::CompactJobExecutor(JobID jobId, folly::Future CompactJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - return adminClient_->addTask(cpp2::AdminCmd::COMPACT, - jobId_, - taskId_++, - space_, - {std::move(address)}, - {}, - std::move(parts), - concurrency_); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::COMPACT, + jobId_, + taskId_++, + space_, + std::move(address), + {}, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } } // namespace meta diff --git a/src/meta/processors/job/FlushJobExecutor.cpp b/src/meta/processors/job/FlushJobExecutor.cpp index 3e70e788915..3c2afa1c49c 100644 --- a/src/meta/processors/job/FlushJobExecutor.cpp +++ b/src/meta/processors/job/FlushJobExecutor.cpp @@ -16,14 +16,27 @@ FlushJobExecutor::FlushJobExecutor(JobID jobId, folly::Future FlushJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - return adminClient_->addTask(cpp2::AdminCmd::FLUSH, - jobId_, - taskId_++, - space_, - {std::move(address)}, - {}, - std::move(parts), - concurrency_); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::FLUSH, + jobId_, + taskId_++, + space_, + std::move(address), + {}, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } } // namespace meta diff --git a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp index 620af8d3056..10577694d4c 100644 --- a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp +++ b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp @@ -10,14 +10,27 @@ namespace meta { folly::Future RebuildEdgeJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - return adminClient_->addTask(cpp2::AdminCmd::REBUILD_EDGE_INDEX, - jobId_, - taskId_++, - space_, - {std::move(address)}, - taskParameters_, - std::move(parts), - concurrency_); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::REBUILD_EDGE_INDEX, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } } // namespace meta diff --git a/src/meta/processors/job/RebuildFTJobExecutor.cpp b/src/meta/processors/job/RebuildFTJobExecutor.cpp index aea76bbb268..f92deed1046 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.cpp +++ b/src/meta/processors/job/RebuildFTJobExecutor.cpp @@ -10,14 +10,27 @@ namespace meta { folly::Future RebuildFTJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - return adminClient_->addTask(cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX, - jobId_, - taskId_++, - space_, - {std::move(address)}, - taskParameters_, - std::move(parts), - concurrency_); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } } // namespace meta diff --git a/src/meta/processors/job/RebuildJobExecutor.cpp b/src/meta/processors/job/RebuildJobExecutor.cpp index 8be68da16f9..a1f826fafa2 100644 --- a/src/meta/processors/job/RebuildJobExecutor.cpp +++ b/src/meta/processors/job/RebuildJobExecutor.cpp @@ -59,9 +59,10 @@ nebula::cpp2::ErrorCode RebuildJobExecutor::stop() { } auto& hosts = nebula::value(errOrTargetHost); - std::vector> futures; + std::vector>> futures; for (auto& host : hosts) { - auto future = adminClient_->stopTask({Utils::getAdminAddrFromStoreAddr(host.first)}, jobId_, 0); + // Will convert StorageAddr to AdminAddr in AdminClient + auto future = adminClient_->stopTask(host.first, jobId_, 0); futures.emplace_back(std::move(future)); } diff --git a/src/meta/processors/job/RebuildTagJobExecutor.cpp b/src/meta/processors/job/RebuildTagJobExecutor.cpp index 2b6ad70fc52..b42a99c8660 100644 --- a/src/meta/processors/job/RebuildTagJobExecutor.cpp +++ b/src/meta/processors/job/RebuildTagJobExecutor.cpp @@ -10,14 +10,27 @@ namespace meta { folly::Future RebuildTagJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - return adminClient_->addTask(cpp2::AdminCmd::REBUILD_TAG_INDEX, - jobId_, - taskId_++, - space_, - {std::move(address)}, - taskParameters_, - std::move(parts), - concurrency_); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::REBUILD_TAG_INDEX, + jobId_, + taskId_++, + space_, + std::move(address), + taskParameters_, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } } // namespace meta diff --git a/src/meta/processors/job/StatsJobExecutor.cpp b/src/meta/processors/job/StatsJobExecutor.cpp index bb918c4f22a..705d38eb982 100644 --- a/src/meta/processors/job/StatsJobExecutor.cpp +++ b/src/meta/processors/job/StatsJobExecutor.cpp @@ -59,17 +59,27 @@ nebula::cpp2::ErrorCode StatsJobExecutor::prepare() { folly::Future StatsJobExecutor::executeInternal(HostAddr&& address, std::vector&& parts) { - cpp2::StatsItem item; - statsItem_.emplace(address, item); - return adminClient_->addTask(cpp2::AdminCmd::STATS, - jobId_, - taskId_++, - space_, - {std::move(address)}, - {}, - std::move(parts), - concurrency_, - &(statsItem_[address])); + folly::Promise pro; + auto f = pro.getFuture(); + adminClient_ + ->addTask(cpp2::AdminCmd::STATS, + jobId_, + taskId_++, + space_, + std::move(address), + {}, + std::move(parts), + concurrency_) + .then([pro = std::move(pro)](auto&& t) mutable { + CHECK(!t.hasException()); + auto status = std::move(t).value(); + if (status.ok()) { + pro.setValue(Status::OK()); + } else { + pro.setValue(status.status()); + } + }); + return f; } void showStatsItem(const cpp2::StatsItem& item, const std::string& msg) { @@ -187,9 +197,10 @@ nebula::cpp2::ErrorCode StatsJobExecutor::stop() { } auto& hosts = nebula::value(errOrTargetHost); - std::vector> futures; + std::vector>> futures; for (auto& host : hosts) { - auto future = adminClient_->stopTask({Utils::getAdminAddrFromStoreAddr(host.first)}, jobId_, 0); + // Will convert StorageAddr to AdminAddr in AdminClient + auto future = adminClient_->stopTask(host.first, jobId_, 0); futures.emplace_back(std::move(future)); } diff --git a/src/meta/processors/job/StatsJobExecutor.h b/src/meta/processors/job/StatsJobExecutor.h index bfbc3d84478..9b6797753d2 100644 --- a/src/meta/processors/job/StatsJobExecutor.h +++ b/src/meta/processors/job/StatsJobExecutor.h @@ -48,10 +48,6 @@ class StatsJobExecutor : public StorageJobExecutor { std::string toTempKey(int32_t jobId); nebula::cpp2::ErrorCode doRemove(const std::string& key); - - private: - // Stats results - std::unordered_map statsItem_; }; } // namespace meta diff --git a/src/meta/processors/job/StorageJobExecutor.cpp b/src/meta/processors/job/StorageJobExecutor.cpp index 6432b25c423..e00b3493ad1 100644 --- a/src/meta/processors/job/StorageJobExecutor.cpp +++ b/src/meta/processors/job/StorageJobExecutor.cpp @@ -172,9 +172,8 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() { std::vector> futures; for (auto& address : addresses) { - // transform to the admin host - auto h = Utils::getAdminAddrFromStoreAddr(address.first); - futures.emplace_back(executeInternal(std::move(h), std::move(address.second))); + // Will convert StorageAddr to AdminAddr in AdminClient + futures.emplace_back(executeInternal(std::move(address.first), std::move(address.second))); } auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/meta/test/AdminClientTest.cpp b/src/meta/test/AdminClientTest.cpp index cae27355f21..5da27802919 100644 --- a/src/meta/test/AdminClientTest.cpp +++ b/src/meta/test/AdminClientTest.cpp @@ -90,9 +90,7 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { auto f = pro.getFuture(); storage::cpp2::CreateCPResp resp; storage::cpp2::ResponseCommon result; - std::vector partRetCode; - result.failed_parts_ref() = partRetCode; - resp.result_ref() = result; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; nebula::cpp2::CheckpointInfo cpInfo; cpInfo.path_ref() = "snapshot_path"; resp.info_ref() = {cpInfo}; @@ -100,32 +98,34 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { return f; } - folly::Future future_dropCheckpoint( + folly::Future future_dropCheckpoint( const storage::cpp2::DropCPRequest& req) override { - RETURN_OK(req); + UNUSED(req); + folly::Promise pro; + auto f = pro.getFuture(); + storage::cpp2::DropCPResp resp; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + pro.setValue(std::move(resp)); + return f; } - folly::Future future_blockingWrites( + folly::Future future_blockingWrites( const storage::cpp2::BlockingSignRequest& req) override { - RETURN_OK(req); - } - - folly::Future future_rebuildTagIndex( - const storage::cpp2::RebuildIndexRequest& req) override { - RETURN_OK(req); - } - - folly::Future future_rebuildEdgeIndex( - const storage::cpp2::RebuildIndexRequest& req) override { - RETURN_OK(req); + UNUSED(req); + folly::Promise pro; + auto f = pro.getFuture(); + storage::cpp2::BlockingSignResp resp; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + pro.setValue(std::move(resp)); + return f; } }; class TestStorageServiceRetry : public TestStorageService { public: - TestStorageServiceRetry(std::string addr, Port adminPort) { + void setLeader(std::string leaderAddr, Port leaderAdminPort) { // when leader change returned, the port is always data port - leader_ = Utils::getStoreAddrFromAdminAddr(HostAddr(addr, adminPort)); + leader_ = Utils::getStoreAddrFromAdminAddr(HostAddr(leaderAddr, leaderAdminPort)); } folly::Future future_addLearner( @@ -154,7 +154,6 @@ TEST(AdminClientTest, SimpleTest) { LOG(INFO) << "Start storage server on " << rpcServer->port_; std::string localIp("127.0.0.1"); - // network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); fs::TempDir rootPath("/tmp/AdminTest.XXXXXX"); std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); auto client = std::make_unique(kv.get()); @@ -185,17 +184,17 @@ TEST(AdminClientTest, SimpleTest) { } TEST(AdminClientTest, RetryTest) { + std::string localIp("127.0.0.1"); + auto rpcServer1 = std::make_unique(); auto handler1 = std::make_shared(); rpcServer1->start("storage-admin-1", 0, handler1); LOG(INFO) << "Start storage server on " << rpcServer1->port_; - std::string localIp("127.0.0.1"); - // network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - auto rpcServer2 = std::make_unique(); - auto handler2 = std::make_shared(localIp, rpcServer1->port_); + auto handler2 = std::make_shared(); rpcServer2->start("storage-admin-2", 0, handler2); + handler2->setLeader(localIp, rpcServer1->port_); LOG(INFO) << "Start storage2 server on " << rpcServer2->port_; LOG(INFO) << "Now test interfaces with retry to leader!"; @@ -294,6 +293,82 @@ TEST(AdminClientTest, RetryTest) { } } +TEST(AdminClientTest, LearnerBecomeLeaderTest) { + // Three replica, server 1/2/3 is normal replica, server 4 is the learner at first and it + // becomes leader + auto rpcServer1 = std::make_unique(); + auto rpcServer2 = std::make_unique(); + auto rpcServer3 = std::make_unique(); + auto rpcServer4 = std::make_unique(); + auto handler1 = std::make_shared(); + auto handler2 = std::make_shared(); + auto handler3 = std::make_shared(); + auto handler4 = std::make_shared(); + rpcServer1->start("storage-admin-1", 0, handler1); + rpcServer2->start("storage-admin-2", 0, handler2); + rpcServer3->start("storage-admin-3", 0, handler3); + rpcServer4->start("storage-admin-4", 0, handler4); + LOG(INFO) << "Start storage1 server on " << rpcServer1->port_; + LOG(INFO) << "Start storage2 server on " << rpcServer2->port_; + LOG(INFO) << "Start storage3 server on " << rpcServer3->port_; + LOG(INFO) << "Start storage4 server on " << rpcServer4->port_; + + // server1 believes server2 is leader + // server2 believes server3 is leader + // server3 believes server4 is leader + // so a request send to server1 need to be retried for 3 times, and be processed in server4 + std::string localIp("127.0.0.1"); + handler1->setLeader(localIp, rpcServer2->port_); + handler2->setLeader(localIp, rpcServer3->port_); + handler3->setLeader(localIp, rpcServer4->port_); + + LOG(INFO) << "Now test interfaces with retry to leader!"; + fs::TempDir rootPath("/tmp/AdminTest.XXXXXX"); + std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); + { + LOG(INFO) << "Write the part information, server 1/2/3 is the normal replica"; + // The request will be sent to rpcServer1 first + std::vector thriftPeers; + thriftPeers.emplace_back(Utils::getStoreAddrFromAdminAddr({localIp, rpcServer1->port_})); + thriftPeers.emplace_back(Utils::getStoreAddrFromAdminAddr({localIp, rpcServer2->port_})); + thriftPeers.emplace_back(Utils::getStoreAddrFromAdminAddr({localIp, rpcServer3->port_})); + + std::vector data; + data.emplace_back(MetaKeyUtils::partKey(0, 1), MetaKeyUtils::partVal(thriftPeers)); + folly::Baton baton; + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&baton](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); + baton.wait(); + } + + LOG(INFO) << "Now test interfaces with retry to leader!"; + auto client = std::make_unique(kv.get()); + for (int32_t retryTime = 0; retryTime < 3; retryTime++) { + FLAGS_max_retry_times_admin_op = retryTime; + LOG(INFO) << "Test member change by adding a fake host"; + folly::Baton baton; + client->memberChange(0, 1, HostAddr("0", 0), true).thenValue([&baton](auto&& st) { + CHECK(!st.ok()); + CHECK_EQ("Leader changed!", st.toString()); + baton.post(); + }); + baton.wait(); + } + FLAGS_max_retry_times_admin_op = 3; + { + LOG(INFO) << "Test member change by adding a fake host"; + folly::Baton baton; + client->memberChange(0, 1, HostAddr("0", 0), true).thenValue([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } +} + TEST(AdminClientTest, SnapshotTest) { auto rpcServer = std::make_unique(); auto handler = std::make_shared(); diff --git a/src/meta/test/CreateBackupProcessorTest.cpp b/src/meta/test/CreateBackupProcessorTest.cpp index f81f1e583d9..413b99bf5bb 100644 --- a/src/meta/test/CreateBackupProcessorTest.cpp +++ b/src/meta/test/CreateBackupProcessorTest.cpp @@ -41,36 +41,45 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { folly::Future future_createCheckpoint( const storage::cpp2::CreateCPRequest& req) override { - UNUSED(req); folly::Promise pro; auto f = pro.getFuture(); storage::cpp2::CreateCPResp resp; storage::cpp2::ResponseCommon result; - std::vector partRetCode; std::unordered_map info; nebula::cpp2::LogInfo logInfo; logInfo.log_id_ref() = logId; logInfo.term_id_ref() = termId; info.emplace(1, std::move(logInfo)); - result.failed_parts_ref() = partRetCode; - resp.result_ref() = result; nebula::cpp2::CheckpointInfo cpInfo; cpInfo.path_ref() = "snapshot_path"; cpInfo.parts_ref() = std::move(info); cpInfo.space_id_ref() = req.get_space_ids()[0]; resp.info_ref() = {cpInfo}; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; pro.setValue(std::move(resp)); return f; } - folly::Future future_dropCheckpoint( + folly::Future future_dropCheckpoint( const storage::cpp2::DropCPRequest& req) override { - RETURN_OK(req); + UNUSED(req); + folly::Promise pro; + auto f = pro.getFuture(); + storage::cpp2::DropCPResp resp; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + pro.setValue(std::move(resp)); + return f; } - folly::Future future_blockingWrites( + folly::Future future_blockingWrites( const storage::cpp2::BlockingSignRequest& req) override { - RETURN_OK(req); + UNUSED(req); + folly::Promise pro; + auto f = pro.getFuture(); + storage::cpp2::BlockingSignResp resp; + resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + pro.setValue(std::move(resp)); + return f; } }; diff --git a/src/meta/test/GetStatsTest.cpp b/src/meta/test/GetStatsTest.cpp index aa6ce2e3da4..c882dc2a0ac 100644 --- a/src/meta/test/GetStatsTest.cpp +++ b/src/meta/test/GetStatsTest.cpp @@ -87,6 +87,8 @@ class GetStatsTest : public ::testing::Test { DefaultValue>::SetFactory( [] { return folly::Future(Status::OK()); }); + DefaultValue>>::SetFactory( + [] { return folly::Future>(true); }); jobMgr = JobManager::getInstance(); jobMgr->status_ = JobManager::JbmgrStatus::NOT_START; @@ -378,7 +380,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) { JobCallBack cb1(jobMgr, jobId1, 0, 100); JobCallBack cb2(jobMgr, 2, 0, 200); - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _, _)) + EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _)) .Times(2) .WillOnce(testing::InvokeWithoutArgs(cb1)) .WillOnce(testing::InvokeWithoutArgs(cb2)); @@ -494,7 +496,7 @@ TEST_F(GetStatsTest, MockMultiMachineTest) { JobCallBack cb2(jobMgr, jobId, 1, 200); JobCallBack cb3(jobMgr, jobId, 2, 300); - EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _, _)) + EXPECT_CALL(adminClient, addTask(_, _, _, _, _, _, _, _)) .Times(3) .WillOnce(testing::InvokeWithoutArgs(cb1)) .WillOnce(testing::InvokeWithoutArgs(cb2)) diff --git a/src/meta/test/MockAdminClient.h b/src/meta/test/MockAdminClient.h index 25715351004..40d3f1dc085 100644 --- a/src/meta/test/MockAdminClient.h +++ b/src/meta/test/MockAdminClient.h @@ -33,24 +33,23 @@ class MockAdminClient : public AdminClient { const std::string&, const HostAddr&)); MOCK_METHOD3(dropSnapshot, - folly::Future(const std::set&, - const std::string&, - const HostAddr&)); + folly::Future>(const std::set&, + const std::string&, + const HostAddr&)); MOCK_METHOD3(blockingWrites, - folly::Future(const std::set&, - storage::cpp2::EngineSignType, - const HostAddr&)); - MOCK_METHOD9(addTask, - folly::Future(cpp2::AdminCmd, - int32_t, - int32_t, - GraphSpaceID, - const std::vector&, - const std::vector&, - std::vector, - int, - cpp2::StatsItem*)); - MOCK_METHOD3(stopTask, folly::Future(const std::vector&, int32_t, int32_t)); + folly::Future>(const std::set&, + storage::cpp2::EngineSignType, + const HostAddr&)); + MOCK_METHOD8(addTask, + folly::Future>(cpp2::AdminCmd, + int32_t, + int32_t, + GraphSpaceID, + const HostAddr&, + const std::vector&, + std::vector, + int)); + MOCK_METHOD3(stopTask, folly::Future>(const HostAddr&, int32_t, int32_t)); }; } // namespace meta diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index b1227f4bcd8..9fdcb0f2250 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -21,6 +21,7 @@ nebula_add_library( admin/RebuildEdgeIndexTask.cpp admin/RebuildFTIndexTask.cpp admin/StatsTask.cpp + admin/GetLeaderProcessor.cpp ) nebula_add_library( diff --git a/src/storage/StorageAdminServiceHandler.cpp b/src/storage/StorageAdminServiceHandler.cpp index a81bde93052..ba05566e9bb 100644 --- a/src/storage/StorageAdminServiceHandler.cpp +++ b/src/storage/StorageAdminServiceHandler.cpp @@ -9,6 +9,7 @@ #include "storage/admin/AdminTaskProcessor.h" #include "storage/admin/CreateCheckpointProcessor.h" #include "storage/admin/DropCheckpointProcessor.h" +#include "storage/admin/GetLeaderProcessor.h" #include "storage/admin/SendBlockSignProcessor.h" #include "storage/admin/StopAdminTaskProcessor.h" @@ -74,26 +75,26 @@ folly::Future StorageAdminServiceHandler::future_createCheck RETURN_FUTURE(processor); } -folly::Future StorageAdminServiceHandler::future_dropCheckpoint( +folly::Future StorageAdminServiceHandler::future_dropCheckpoint( const cpp2::DropCPRequest& req) { auto* processor = DropCheckpointProcessor::instance(env_); RETURN_FUTURE(processor); } -folly::Future StorageAdminServiceHandler::future_blockingWrites( +folly::Future StorageAdminServiceHandler::future_blockingWrites( const cpp2::BlockingSignRequest& req) { auto* processor = SendBlockSignProcessor::instance(env_); RETURN_FUTURE(processor); } -folly::Future StorageAdminServiceHandler::future_addAdminTask( - const cpp2::AddAdminTaskRequest& req) { +folly::Future StorageAdminServiceHandler::future_addAdminTask( + const cpp2::AddTaskRequest& req) { auto* processor = AdminTaskProcessor::instance(env_); RETURN_FUTURE(processor); } -folly::Future StorageAdminServiceHandler::future_stopAdminTask( - const cpp2::StopAdminTaskRequest& req) { +folly::Future StorageAdminServiceHandler::future_stopAdminTask( + const cpp2::StopTaskRequest& req) { auto* processor = StopAdminTaskProcessor::instance(env_); RETURN_FUTURE(processor); } diff --git a/src/storage/StorageAdminServiceHandler.h b/src/storage/StorageAdminServiceHandler.h index 6f8bf994cc3..8c068a1c333 100644 --- a/src/storage/StorageAdminServiceHandler.h +++ b/src/storage/StorageAdminServiceHandler.h @@ -39,16 +39,14 @@ class StorageAdminServiceHandler final : public cpp2::StorageAdminServiceSvIf { folly::Future future_createCheckpoint( const cpp2::CreateCPRequest& req) override; - folly::Future future_dropCheckpoint(const cpp2::DropCPRequest& req) override; + folly::Future future_dropCheckpoint(const cpp2::DropCPRequest& req) override; - folly::Future future_blockingWrites( + folly::Future future_blockingWrites( const cpp2::BlockingSignRequest& req) override; - folly::Future future_addAdminTask( - const cpp2::AddAdminTaskRequest& req) override; + folly::Future future_addAdminTask(const cpp2::AddTaskRequest& req) override; - folly::Future future_stopAdminTask( - const cpp2::StopAdminTaskRequest& req) override; + folly::Future future_stopAdminTask(const cpp2::StopTaskRequest& req) override; private: StorageEnv* env_{nullptr}; diff --git a/src/storage/admin/AdminProcessor.h b/src/storage/admin/AdminProcessor.h index 16d69f57ac0..534271dfb5c 100644 --- a/src/storage/admin/AdminProcessor.h +++ b/src/storage/admin/AdminProcessor.h @@ -335,31 +335,6 @@ class CheckPeersProcessor : public BaseProcessor { explicit CheckPeersProcessor(StorageEnv* env) : BaseProcessor(env) {} }; -class GetLeaderProcessor : public BaseProcessor { - public: - static GetLeaderProcessor* instance(StorageEnv* env) { - return new GetLeaderProcessor(env); - } - - void process(const cpp2::GetLeaderReq&) { - CHECK_NOTNULL(env_->kvstore_); - std::unordered_map> allLeaders; - env_->kvstore_->allLeader(allLeaders); - std::unordered_map> leaderIds; - for (auto& spaceLeaders : allLeaders) { - auto& spaceId = spaceLeaders.first; - for (auto& partLeader : spaceLeaders.second) { - leaderIds[spaceId].emplace_back(partLeader.get_part_id()); - } - } - resp_.leader_parts_ref() = std::move(leaderIds); - this->onFinished(); - } - - private: - explicit GetLeaderProcessor(StorageEnv* env) : BaseProcessor(env) {} -}; - } // namespace storage } // namespace nebula diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index dae40194338..00d5e4b2916 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -39,7 +39,7 @@ struct TaskContext { using CallBack = std::function; TaskContext() = default; - TaskContext(const cpp2::AddAdminTaskRequest& req, CallBack cb) + TaskContext(const cpp2::AddTaskRequest& req, CallBack cb) : cmd_(req.get_cmd()), jobId_(req.get_job_id()), taskId_(req.get_task_id()), diff --git a/src/storage/admin/AdminTaskProcessor.cpp b/src/storage/admin/AdminTaskProcessor.cpp index bf3fe3429ec..f517025895a 100644 --- a/src/storage/admin/AdminTaskProcessor.cpp +++ b/src/storage/admin/AdminTaskProcessor.cpp @@ -12,7 +12,8 @@ namespace nebula { namespace storage { -void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) { + +void AdminTaskProcessor::process(const cpp2::AddTaskRequest& req) { auto taskManager = AdminTaskManager::instance(); auto cb = [taskManager, jobId = req.get_job_id(), taskId = req.get_task_id()]( @@ -25,19 +26,23 @@ void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) { if (task) { nebula::meta::cpp2::StatsItem statsItem; statsItem.status_ref() = nebula::meta::cpp2::JobStatus::RUNNING; + // write an initial state of task taskManager->saveTaskStatus( ctx.jobId_, ctx.taskId_, nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED, statsItem); taskManager->addAsyncTask(task); } else { - cpp2::PartitionResult thriftRet; - thriftRet.code_ref() = nebula::cpp2::ErrorCode::E_INVALID_TASK_PARA; - codes_.emplace_back(std::move(thriftRet)); + resp_.code_ref() = nebula::cpp2::ErrorCode::E_INVALID_TASK_PARA; + onFinished(); + return; } + + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } -void AdminTaskProcessor::onProcessFinished(nebula::meta::cpp2::StatsItem& result) { - resp_.stats_ref() = std::move(result); +void AdminTaskProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; } } // namespace storage diff --git a/src/storage/admin/AdminTaskProcessor.h b/src/storage/admin/AdminTaskProcessor.h index 8ce50201b96..48461b610a9 100644 --- a/src/storage/admin/AdminTaskProcessor.h +++ b/src/storage/admin/AdminTaskProcessor.h @@ -16,18 +16,26 @@ namespace nebula { namespace storage { -class AdminTaskProcessor : public BaseProcessor { +class AdminTaskProcessor { public: static AdminTaskProcessor* instance(StorageEnv* env) { return new AdminTaskProcessor(env); } - void process(const cpp2::AddAdminTaskRequest& req); + void process(const cpp2::AddTaskRequest& req); + + folly::Future getFuture() { + return promise_.getFuture(); + } private: - explicit AdminTaskProcessor(StorageEnv* env) : BaseProcessor(env) {} + explicit AdminTaskProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); - void onProcessFinished(nebula::meta::cpp2::StatsItem& result); + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::AddTaskResp resp_; }; } // namespace storage diff --git a/src/storage/admin/CreateCheckpointProcessor.cpp b/src/storage/admin/CreateCheckpointProcessor.cpp index 4bfadabc1fa..589ffe26dca 100644 --- a/src/storage/admin/CreateCheckpointProcessor.cpp +++ b/src/storage/admin/CreateCheckpointProcessor.cpp @@ -22,9 +22,7 @@ void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { } if (!ok(ckRet)) { - cpp2::PartitionResult thriftRet; - thriftRet.code_ref() = error(ckRet); - codes_.emplace_back(std::move(thriftRet)); + resp_.code_ref() = error(ckRet); onFinished(); return; } @@ -33,9 +31,15 @@ void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { ckInfoList.insert(ckInfoList.end(), spaceCkList.begin(), spaceCkList.end()); } + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; resp_.info_ref() = ckInfoList; onFinished(); } +void CreateCheckpointProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/CreateCheckpointProcessor.h b/src/storage/admin/CreateCheckpointProcessor.h index 00b39a8db0b..cea017b6dfc 100644 --- a/src/storage/admin/CreateCheckpointProcessor.h +++ b/src/storage/admin/CreateCheckpointProcessor.h @@ -14,7 +14,7 @@ namespace nebula { namespace storage { -class CreateCheckpointProcessor : public BaseProcessor { +class CreateCheckpointProcessor { public: static CreateCheckpointProcessor* instance(StorageEnv* env) { return new CreateCheckpointProcessor(env); @@ -22,8 +22,18 @@ class CreateCheckpointProcessor : public BaseProcessor { void process(const cpp2::CreateCPRequest& req); + folly::Future getFuture() { + return promise_.getFuture(); + } + private: - explicit CreateCheckpointProcessor(StorageEnv* env) : BaseProcessor(env) {} + explicit CreateCheckpointProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); + + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::CreateCPResp resp_; }; } // namespace storage diff --git a/src/storage/admin/DropCheckpointProcessor.cpp b/src/storage/admin/DropCheckpointProcessor.cpp index 3626b0948a7..2486fc9c7a6 100644 --- a/src/storage/admin/DropCheckpointProcessor.cpp +++ b/src/storage/admin/DropCheckpointProcessor.cpp @@ -20,15 +20,20 @@ void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) { } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - cpp2::PartitionResult res; - res.code_ref() = code; - codes_.emplace_back(std::move(res)); - break; + resp_.code_ref() = code; + onFinished(); + return; } } + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } +void DropCheckpointProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/DropCheckpointProcessor.h b/src/storage/admin/DropCheckpointProcessor.h index 7e71bee859a..361fffb3e9d 100644 --- a/src/storage/admin/DropCheckpointProcessor.h +++ b/src/storage/admin/DropCheckpointProcessor.h @@ -13,7 +13,7 @@ namespace nebula { namespace storage { -class DropCheckpointProcessor : public BaseProcessor { +class DropCheckpointProcessor { public: static DropCheckpointProcessor* instance(StorageEnv* env) { return new DropCheckpointProcessor(env); @@ -21,8 +21,18 @@ class DropCheckpointProcessor : public BaseProcessor { void process(const cpp2::DropCPRequest& req); + folly::Future getFuture() { + return promise_.getFuture(); + } + private: - explicit DropCheckpointProcessor(StorageEnv* env) : BaseProcessor(env) {} + explicit DropCheckpointProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); + + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::DropCPResp resp_; }; } // namespace storage } // namespace nebula diff --git a/src/storage/admin/GetLeaderProcessor.cpp b/src/storage/admin/GetLeaderProcessor.cpp new file mode 100644 index 00000000000..4bf47b51e59 --- /dev/null +++ b/src/storage/admin/GetLeaderProcessor.cpp @@ -0,0 +1,33 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/GetLeaderProcessor.h" + +namespace nebula { +namespace storage { + +void GetLeaderProcessor::process(const cpp2::GetLeaderReq& req) { + UNUSED(req); + CHECK_NOTNULL(env_->kvstore_); + std::unordered_map> allLeaders; + env_->kvstore_->allLeader(allLeaders); + std::unordered_map> leaderIds; + for (auto& spaceLeaders : allLeaders) { + auto& spaceId = spaceLeaders.first; + for (auto& partLeader : spaceLeaders.second) { + leaderIds[spaceId].emplace_back(partLeader.get_part_id()); + } + } + resp_.leader_parts_ref() = std::move(leaderIds); + this->onFinished(); +} + +void GetLeaderProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/GetLeaderProcessor.h b/src/storage/admin/GetLeaderProcessor.h new file mode 100644 index 00000000000..f70aad973e9 --- /dev/null +++ b/src/storage/admin/GetLeaderProcessor.h @@ -0,0 +1,41 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_GETLEADERPROCESSOR_H_ +#define STORAGE_ADMIN_GETLEADERPROCESSOR_H_ + +#include "common/base/Base.h" +#include "kvstore/NebulaStore.h" +#include "storage/BaseProcessor.h" +#include "storage/StorageFlags.h" + +namespace nebula { +namespace storage { + +class GetLeaderProcessor { + public: + static GetLeaderProcessor* instance(StorageEnv* env) { + return new GetLeaderProcessor(env); + } + + void process(const cpp2::GetLeaderReq& req); + + folly::Future getFuture() { + return promise_.getFuture(); + } + + private: + explicit GetLeaderProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); + + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::GetLeaderPartsResp resp_; +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_ADMIN_GETLEADERPROCESSOR_H_ diff --git a/src/storage/admin/SendBlockSignProcessor.cpp b/src/storage/admin/SendBlockSignProcessor.cpp index 0ee2e764324..5d9ffdded2d 100644 --- a/src/storage/admin/SendBlockSignProcessor.cpp +++ b/src/storage/admin/SendBlockSignProcessor.cpp @@ -16,17 +16,21 @@ void SendBlockSignProcessor::process(const cpp2::BlockingSignRequest& req) { LOG(INFO) << "Receive block sign for space " << spaceId << ", block: " << sign; auto code = env_->kvstore_->setWriteBlocking(spaceId, sign); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - cpp2::PartitionResult thriftRet; - thriftRet.code_ref() = code; - codes_.emplace_back(std::move(thriftRet)); LOG(INFO) << "set block sign failed, error: " << apache::thrift::util::enumNameSafe(code); + resp_.code_ref() = code; onFinished(); return; } } + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } +void SendBlockSignProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/SendBlockSignProcessor.h b/src/storage/admin/SendBlockSignProcessor.h index 895667e2372..82d4b5175b5 100644 --- a/src/storage/admin/SendBlockSignProcessor.h +++ b/src/storage/admin/SendBlockSignProcessor.h @@ -14,7 +14,7 @@ namespace nebula { namespace storage { -class SendBlockSignProcessor : public BaseProcessor { +class SendBlockSignProcessor { public: static SendBlockSignProcessor* instance(StorageEnv* env) { return new SendBlockSignProcessor(env); @@ -22,8 +22,18 @@ class SendBlockSignProcessor : public BaseProcessor { void process(const cpp2::BlockingSignRequest& req); + folly::Future getFuture() { + return promise_.getFuture(); + } + private: - explicit SendBlockSignProcessor(StorageEnv* env) : BaseProcessor(env) {} + explicit SendBlockSignProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); + + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::BlockingSignResp resp_; }; } // namespace storage } // namespace nebula diff --git a/src/storage/admin/StopAdminTaskProcessor.cpp b/src/storage/admin/StopAdminTaskProcessor.cpp index 686391487cc..fa73a5e2f7f 100644 --- a/src/storage/admin/StopAdminTaskProcessor.cpp +++ b/src/storage/admin/StopAdminTaskProcessor.cpp @@ -11,18 +11,24 @@ namespace nebula { namespace storage { -void StopAdminTaskProcessor::process(const cpp2::StopAdminTaskRequest& req) { +void StopAdminTaskProcessor::process(const cpp2::StopTaskRequest& req) { auto taskManager = AdminTaskManager::instance(); auto rc = taskManager->cancelJob(req.get_job_id()); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { - cpp2::PartitionResult thriftRet; - thriftRet.code_ref() = rc; - codes_.emplace_back(std::move(thriftRet)); + resp_.code_ref() = rc; + onFinished(); + return; } + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } +void StopAdminTaskProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/StopAdminTaskProcessor.h b/src/storage/admin/StopAdminTaskProcessor.h index 445403ce70c..34a6d24f535 100644 --- a/src/storage/admin/StopAdminTaskProcessor.h +++ b/src/storage/admin/StopAdminTaskProcessor.h @@ -15,16 +15,28 @@ namespace nebula { namespace storage { -class StopAdminTaskProcessor : public BaseProcessor { +class StopAdminTaskProcessor { public: static StopAdminTaskProcessor* instance(StorageEnv* env) { return new StopAdminTaskProcessor(env); } - void process(const cpp2::StopAdminTaskRequest& req); + void process(const cpp2::StopTaskRequest& req); + + folly::Future getFuture() { + return promise_.getFuture(); + } private: - explicit StopAdminTaskProcessor(StorageEnv* env) : BaseProcessor(env) {} + explicit StopAdminTaskProcessor(StorageEnv* env) : env_(env) { + UNUSED(env_); + } + + void onFinished(); + + StorageEnv* env_{nullptr}; + folly::Promise promise_; + cpp2::StopTaskResp resp_; }; } // namespace storage diff --git a/src/storage/test/CheckpointTest.cpp b/src/storage/test/CheckpointTest.cpp index c2a61f3d22a..04322ca184a 100644 --- a/src/storage/test/CheckpointTest.cpp +++ b/src/storage/test/CheckpointTest.cpp @@ -43,7 +43,7 @@ TEST(CheckpointTest, simpleTest) { auto fut = processor->getFuture(); processor->process(req); auto resp = std::move(fut).get(); - EXPECT_EQ(0, resp.result.failed_parts.size()); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); auto checkpoint1 = folly::stringPrintf("%s/disk1/nebula/1/checkpoints/checkpoint_test/data", dataPath.path()); auto files = fs::FileUtils::listAllFilesInDir(checkpoint1.data()); diff --git a/src/storage/test/IndexWithTTLTest.cpp b/src/storage/test/IndexWithTTLTest.cpp index 30b525e247d..b82eb4c7fa8 100644 --- a/src/storage/test/IndexWithTTLTest.cpp +++ b/src/storage/test/IndexWithTTLTest.cpp @@ -430,7 +430,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTL) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"2021002"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -499,7 +499,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTL) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"2021002"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -570,7 +570,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTLExpired) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"2021002"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -641,7 +641,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTLExpired) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"2021002"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15; diff --git a/src/storage/test/RebuildIndexTest.cpp b/src/storage/test/RebuildIndexTest.cpp index c31d7318a0e..5b2c456b7d0 100644 --- a/src/storage/test/RebuildIndexTest.cpp +++ b/src/storage/test/RebuildIndexTest.cpp @@ -77,7 +77,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexCheckALLData) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"4", "5"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -164,7 +164,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexCheckALLData) { parameter.parts_ref() = parts; parameter.task_specific_paras_ref() = {"103", "104"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 16; @@ -261,7 +261,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithDelete) { parameter.parts_ref() = std::move(parts); parameter.task_specific_paras_ref() = {"4", "5"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 11; @@ -322,7 +322,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { parameter.parts_ref() = std::move(parts); parameter.task_specific_paras_ref() = {"4", "5"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 12; @@ -366,7 +366,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { std::vector parts = {1, 2, 3, 4, 5, 6}; parameter.parts_ref() = std::move(parts); - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -422,7 +422,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { parameter.parts_ref() = std::move(parts); parameter.task_specific_paras_ref() = {"103", "104"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 14; @@ -484,7 +484,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { parameter.parts_ref() = std::move(parts); parameter.task_specific_paras_ref() = {"103", "104"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15; @@ -528,7 +528,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { parameter.parts_ref() = std::move(parts); parameter.task_specific_paras_ref() = {"103", "104"}; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 16; diff --git a/src/storage/test/StatsTaskTest.cpp b/src/storage/test/StatsTaskTest.cpp index 50bd936180d..2c98e30825b 100644 --- a/src/storage/test/StatsTaskTest.cpp +++ b/src/storage/test/StatsTaskTest.cpp @@ -68,7 +68,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.space_id_ref() = spaceId; parameter.parts_ref() = parts; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; @@ -133,7 +133,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.space_id_ref() = spaceId; parameter.parts_ref() = parts; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 14; @@ -204,7 +204,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.space_id_ref() = spaceId; parameter.parts_ref() = parts; - cpp2::AddAdminTaskRequest request; + cpp2::AddTaskRequest request; request.cmd_ref() = meta::cpp2::AdminCmd::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15;