From b784af2ae83ac023f850ec005283858f95add586 Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 25 Jun 2021 16:17:58 +0800 Subject: [PATCH] curve_ops_tool: fix concurrent check copysets on server --- conf/tools.conf | 2 + .../roles/generate_config/defaults/main.yml | 1 + .../generate_config/templates/tools.conf.j2 | 2 + src/tools/copyset_check_core.cpp | 68 ++++++++++++++----- src/tools/copyset_check_core.h | 21 ++++++ src/tools/curve_tool_define.cpp | 1 + src/tools/curve_tool_define.h | 1 + src/tools/curve_tool_main.cpp | 4 ++ 8 files changed, 83 insertions(+), 17 deletions(-) diff --git a/conf/tools.conf b/conf/tools.conf index 3fe3ff6a66..fcd9884e77 100644 --- a/conf/tools.conf +++ b/conf/tools.conf @@ -6,6 +6,8 @@ mdsDummyPort=6667 rpcTimeout=500 # rpc重试次数 rpcRetryTimes=5 +# the rpc concurrency to chunkserver +rpcConcurrentNum=10 # etcd地址 etcdAddr=127.0.0.1:2379 # snapshot clone server 地址 diff --git a/curve-ansible/roles/generate_config/defaults/main.yml b/curve-ansible/roles/generate_config/defaults/main.yml index 068c35ed59..201db36204 100644 --- a/curve-ansible/roles/generate_config/defaults/main.yml +++ b/curve-ansible/roles/generate_config/defaults/main.yml @@ -274,6 +274,7 @@ s3_throttle_bpsWriteLimit: 1280 # 运维工具默认值 tool_rpc_timeout: 500 tool_rpc_retry_times: 5 +tool_rpc_concurrent_num: 10 # snapshotclone_nginx配置 nginx_docker_internal_port: 80 diff --git a/curve-ansible/roles/generate_config/templates/tools.conf.j2 b/curve-ansible/roles/generate_config/templates/tools.conf.j2 index 38399df1d2..985eef27d9 100644 --- a/curve-ansible/roles/generate_config/templates/tools.conf.j2 +++ b/curve-ansible/roles/generate_config/templates/tools.conf.j2 @@ -12,6 +12,8 @@ mdsDummyPort={{ hostvars[groups.mds[0]].mds_dummy_port }} rpcTimeout={{ tool_rpc_timeout }} # rpc重试次数 rpcRetryTimes={{ tool_rpc_retry_times }} +# the rpc concurrency to chunkserver +rpcConcurrentNum={{ tool_rpc_concurrent_num }} # etcd地址 {% set etcd_address=[] -%} {% for host in groups.etcd -%} diff --git a/src/tools/copyset_check_core.cpp b/src/tools/copyset_check_core.cpp index 269c2eb73c..77d93be23d 100644 --- a/src/tools/copyset_check_core.cpp +++ b/src/tools/copyset_check_core.cpp @@ -154,12 +154,14 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer( bool isHealthy = true; butil::IOBuf iobuf; int res = QueryChunkServer(chunkserverAddr, &iobuf); + statisticsMutest.lock(); if (res != 0) { // 如果查询chunkserver失败,认为不在线,把它上面所有的 // copyset都添加到peerNotOnlineCopysets_里面 UpdatePeerNotOnlineCopysets(chunkserverAddr); serviceExceptionChunkServers_.emplace(chunkserverAddr); chunkserverCopysets_[chunkserverAddr] = {}; + statisticsMutest.unlock(); return ChunkServerHealthStatus::kNotOnline; } // 存储每一个copyset的详细信息 @@ -176,6 +178,7 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer( (!groupIds.empty() && copysetInfos.size() != groupIds.size())) { std::cout << "Some copysets not found on chunkserver, may be tranfered" << std::endl; + statisticsMutest.unlock(); return ChunkServerHealthStatus::kNotHealthy; } // 存储需要发送消息的chunkserver的地址和对应的groupId @@ -244,6 +247,8 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer( isHealthy = false; } } + statisticsMutest.unlock(); + // 遍历没有leader的copyset bool health = CheckCopysetsNoLeader(chunkserverAddr, noLeaderCopysetsPeers); @@ -266,9 +271,9 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer( } bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr, - const std::map>& - copysetsPeers) { + const std::map>& + copysetsPeers) { if (copysetsPeers.empty()) { return true; } @@ -286,6 +291,7 @@ bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr, for (const auto& item : result) { // 如果在配置组中,检查是否是majority offline if (item.second) { + statisticsMutest.lock(); isHealthy = false; std::string groupId = item.first; CheckResult checkRes = CheckPeerOnlineStatus( @@ -296,6 +302,7 @@ bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr, continue; } copysets_[kNoLeader].emplace(groupId); + statisticsMutest.unlock(); } } return isHealthy; @@ -351,9 +358,35 @@ int CopysetCheckCore::CheckCopysetsOnServer(const std::string& serverIp, return CheckCopysetsOnServer(0, serverIp, true, unhealthyChunkServers); } +void CopysetCheckCore::ConcurrentCheckCopysetsOnServer( + const std::vector &chunkservers, + uint32_t *index, bool queryLeader, bool *isHealthy, + std::vector* unhealthyChunkServers) { + while (1) { + indexMutex.lock(); + if (*index > chunkservers.size() - 1) { + indexMutex.unlock(); + break; + } + auto info = chunkservers[*index]; + (*index)++; + indexMutex.unlock(); + std::string csAddr = info.hostip() + ":" + std::to_string(info.port()); + ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr, + {}, queryLeader); + if (res != ChunkServerHealthStatus::kHealthy) { + vectorMutex.lock(); + *isHealthy = false; + if (unhealthyChunkServers) { + unhealthyChunkServers->emplace_back(csAddr); + } + vectorMutex.unlock(); + } + } +} + int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId, - const std::string& serverIp, - bool queryLeader, + const std::string& serverIp, bool queryLeader, std::vector* unhealthyChunkServers) { bool isHealthy = true; // 向mds发送RPC @@ -368,19 +401,20 @@ int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId, std::cout << "ListChunkServersOnServer fail!" << std::endl; return -1; } - for (const auto& info : chunkservers) { - std::string ip = info.hostip(); - uint64_t port = info.port(); - std::string csAddr = ip + ":" + std::to_string(port); - ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr, - {}, queryLeader); - if (res != ChunkServerHealthStatus::kHealthy) { - isHealthy = false; - if (unhealthyChunkServers) { - unhealthyChunkServers->emplace_back(csAddr); - } - } + + std::vector threadpool; + uint32_t index = 0; + for (int i = 0; i < FLAGS_rpcConcurrentNum; i++) { + threadpool.emplace_back(Thread( + &CopysetCheckCore::ConcurrentCheckCopysetsOnServer, + this, std::ref(chunkservers), &index, + queryLeader, &isHealthy, + unhealthyChunkServers)); + } + for (auto &thread : threadpool) { + thread.join(); } + if (isHealthy) { return 0; } else { diff --git a/src/tools/copyset_check_core.h b/src/tools/copyset_check_core.h index ad6e1b0e5d..d45635f5a2 100644 --- a/src/tools/copyset_check_core.h +++ b/src/tools/copyset_check_core.h @@ -43,6 +43,7 @@ #include "src/tools/metric_name.h" #include "src/tools/curve_tool_define.h" #include "include/chunkserver/chunkserver_common.h" +#include "src/common/concurrent/concurrent.h" using curve::mds::topology::PoolIdType; using curve::mds::topology::CopySetIdType; @@ -54,6 +55,8 @@ using curve::mds::topology::ChunkServerStatus; using curve::chunkserver::ToGroupId; using curve::chunkserver::GetPoolID; using curve::chunkserver::GetCopysetID; +using curve::common::Mutex; +using curve::common::Thread; namespace curve { namespace tool { @@ -311,6 +314,19 @@ class CopysetCheckCore { bool queryLeader = true, std::vector* unhealthyChunkServers = nullptr); + /** + * @brief concurrent check copyset on server + * @param[in] chunkservers: chunkservers on server + * @param[in] index: the deal index of chunkserver + * @param[in] queryLeader: whether send rpc to server which leader on + * @param[in] isHealthy: check result + * @param[in] unhealthyChunkServers: store the unhealthy chunkserver list + */ + void ConcurrentCheckCopysetsOnServer( + const std::vector &chunkservers, + uint32_t *index, bool queryLeader, bool *isHealthy, + std::vector* unhealthyChunkServers); + /** * @brief 根据leader的map里面的copyset信息分析出copyset是否健康,健康返回0,否则 * 否则返回错误码 @@ -428,6 +444,11 @@ class CopysetCheckCore { std::string copysetsDetail_; const std::string kEmptyAddr = "0.0.0.0:0:0"; + + // mutex for concurrent rpc to chunkserver + Mutex indexMutex; + Mutex vectorMutex; + Mutex statisticsMutest; }; } // namespace tool diff --git a/src/tools/curve_tool_define.cpp b/src/tools/curve_tool_define.cpp index cee0781080..643d3980be 100644 --- a/src/tools/curve_tool_define.cpp +++ b/src/tools/curve_tool_define.cpp @@ -31,6 +31,7 @@ DEFINE_string(mdsDummyPort, "6667", "dummy port of mds, " DEFINE_string(etcdAddr, "127.0.0.1:2379", "etcd addr"); DEFINE_uint64(rpcTimeout, 3000, "millisecond for rpc timeout"); DEFINE_uint64(rpcRetryTimes, 5, "rpc retry times"); +DEFINE_uint64(rpcConcurrentNum, 10, "rpc concurrent number to chunkserver"); DEFINE_string(snapshotCloneAddr, "127.0.0.1:5555", "snapshot clone addr"); DEFINE_string(snapshotCloneDummyPort, "8081", "dummy port of snapshot clone, " "can specify one or several. " diff --git a/src/tools/curve_tool_define.h b/src/tools/curve_tool_define.h index 5bd9984857..4850474915 100644 --- a/src/tools/curve_tool_define.h +++ b/src/tools/curve_tool_define.h @@ -31,6 +31,7 @@ DECLARE_string(mdsDummyPort); DECLARE_string(etcdAddr); DECLARE_uint64(rpcTimeout); DECLARE_uint64(rpcRetryTimes); +DECLARE_uint64(rpcConcurrentNum); DECLARE_string(snapshotCloneAddr); DECLARE_string(snapshotCloneDummyPort); DECLARE_uint64(chunkSize); diff --git a/src/tools/curve_tool_main.cpp b/src/tools/curve_tool_main.cpp index 0a1eea6307..c42e556079 100644 --- a/src/tools/curve_tool_main.cpp +++ b/src/tools/curve_tool_main.cpp @@ -87,6 +87,10 @@ void UpdateFlagsFromConf(curve::common::Configuration* conf) { if (GetCommandLineFlagInfo("rpcRetryTimes", &info) && info.is_default) { conf->GetUInt64Value("rpcRetryTimes", &FLAGS_rpcRetryTimes); } + if (GetCommandLineFlagInfo("rpcConcurrentNum", &info) && + info.is_default) { + conf->GetUInt64Value("rpcConcurrentNum", &FLAGS_rpcConcurrentNum); + } if (GetCommandLineFlagInfo("snapshotCloneAddr", &info) && info.is_default) { conf->GetStringValue("snapshotCloneAddr", &FLAGS_snapshotCloneAddr);