Skip to content

Commit

Permalink
Revert "curve_ops_tool: concurrent check copysets on server"
Browse files Browse the repository at this point in the history
This reverts commit c16e963.
  • Loading branch information
SeanHai authored and YunhuiChen committed Jul 6, 2021
1 parent c16e963 commit b844f4f
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 75 deletions.
2 changes: 0 additions & 2 deletions conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ mdsDummyPort=6667
rpcTimeout=500
# rpc重试次数
rpcRetryTimes=5
# the rpc concurrency to chunkserver
rpcConcurrentNum=10
# etcd地址
etcdAddr=127.0.0.1:2379
# snapshot clone server 地址
Expand Down
1 change: 0 additions & 1 deletion curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ s3_throttle_bpsWriteLimit: 1280
# 运维工具默认值
tool_rpc_timeout: 500
tool_rpc_retry_times: 5
tool_rpc_concurrent_num: 10

# snapshotclone_nginx配置
nginx_docker_internal_port: 80
Expand Down
2 changes: 0 additions & 2 deletions curve-ansible/roles/generate_config/templates/tools.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ mdsDummyPort={{ hostvars[groups.mds[0]].mds_dummy_port }}
rpcTimeout={{ tool_rpc_timeout }}
# rpc重试次数
rpcRetryTimes={{ tool_rpc_retry_times }}
# the rpc concurrency to chunkserver
rpcConcurrentNum={{ tool_rpc_concurrent_num }}
# etcd地址
{% set etcd_address=[] -%}
{% for host in groups.etcd -%}
Expand Down
61 changes: 17 additions & 44 deletions src/tools/copyset_check_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
}

bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr,
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
if (copysetsPeers.empty()) {
return true;
}
Expand Down Expand Up @@ -351,35 +351,9 @@ int CopysetCheckCore::CheckCopysetsOnServer(const std::string& serverIp,
return CheckCopysetsOnServer(0, serverIp, true, unhealthyChunkServers);
}

void CopysetCheckCore::ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index, bool queryLeader, bool *isHealthy,
std::vector<std::string>* unhealthyChunkServers) {
while (1) {
indexMutex.lock();
if (*index > chunkservers.size() - 1) {
indexMutex.unlock();
break;
}
auto info = chunkservers[*index];
(*index)++;
indexMutex.unlock();
std::string csAddr = info.hostip() + ":" + std::to_string(info.port());
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr,
{}, queryLeader);
if (res != ChunkServerHealthStatus::kHealthy) {
vectorMutex.lock();
*isHealthy = false;
if (unhealthyChunkServers) {
unhealthyChunkServers->emplace_back(csAddr);
}
vectorMutex.unlock();
}
}
}

int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
const std::string& serverIp, bool queryLeader,
const std::string& serverIp,
bool queryLeader,
std::vector<std::string>* unhealthyChunkServers) {
bool isHealthy = true;
// 向mds发送RPC
Expand All @@ -394,20 +368,19 @@ int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
std::cout << "ListChunkServersOnServer fail!" << std::endl;
return -1;
}

std::vector<Thread> threadpool;
uint32_t index = 0;
for (int i = 0; i < FLAGS_rpcConcurrentNum; i++) {
threadpool.emplace_back(Thread(
&CopysetCheckCore::ConcurrentCheckCopysetsOnServer,
this, std::ref(chunkservers), &index,
queryLeader, &isHealthy,
unhealthyChunkServers));
}
for (auto &thread : threadpool) {
thread.join();
for (const auto& info : chunkservers) {
std::string ip = info.hostip();
uint64_t port = info.port();
std::string csAddr = ip + ":" + std::to_string(port);
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr,
{}, queryLeader);
if (res != ChunkServerHealthStatus::kHealthy) {
isHealthy = false;
if (unhealthyChunkServers) {
unhealthyChunkServers->emplace_back(csAddr);
}
}
}

if (isHealthy) {
return 0;
} else {
Expand Down
20 changes: 0 additions & 20 deletions src/tools/copyset_check_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "src/tools/metric_name.h"
#include "src/tools/curve_tool_define.h"
#include "include/chunkserver/chunkserver_common.h"
#include "src/common/concurrent/concurrent.h"

using curve::mds::topology::PoolIdType;
using curve::mds::topology::CopySetIdType;
Expand All @@ -55,8 +54,6 @@ using curve::mds::topology::ChunkServerStatus;
using curve::chunkserver::ToGroupId;
using curve::chunkserver::GetPoolID;
using curve::chunkserver::GetCopysetID;
using curve::common::Mutex;
using curve::common::Thread;

namespace curve {
namespace tool {
Expand Down Expand Up @@ -314,19 +311,6 @@ class CopysetCheckCore {
bool queryLeader = true,
std::vector<std::string>* unhealthyChunkServers = nullptr);

/**
* @brief concurrent check copyset on server
* @param[in] chunkservers: chunkservers on server
* @param[in] index: the deal index of chunkserver
* @param[in] queryLeader: whether send rpc to server which leader on
* @param[in] isHealthy: check result
* @param[in] unhealthyChunkServers: store the unhealthy chunkserver list
*/
void ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index, bool queryLeader, bool *isHealthy,
std::vector<std::string>* unhealthyChunkServers);

/**
* @brief 根据leader的map里面的copyset信息分析出copyset是否健康,健康返回0,否则
* 否则返回错误码
Expand Down Expand Up @@ -444,10 +428,6 @@ class CopysetCheckCore {
std::string copysetsDetail_;

const std::string kEmptyAddr = "0.0.0.0:0:0";

// mutex for concurrent rpc to chunkserver
Mutex indexMutex;
Mutex vectorMutex;
};

} // namespace tool
Expand Down
1 change: 0 additions & 1 deletion src/tools/curve_tool_define.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ DEFINE_string(mdsDummyPort, "6667", "dummy port of mds, "
DEFINE_string(etcdAddr, "127.0.0.1:2379", "etcd addr");
DEFINE_uint64(rpcTimeout, 3000, "millisecond for rpc timeout");
DEFINE_uint64(rpcRetryTimes, 5, "rpc retry times");
DEFINE_uint64(rpcConcurrentNum, 10, "rpc concurrent number to chunkserver");
DEFINE_string(snapshotCloneAddr, "127.0.0.1:5555", "snapshot clone addr");
DEFINE_string(snapshotCloneDummyPort, "8081", "dummy port of snapshot clone, "
"can specify one or several. "
Expand Down
1 change: 0 additions & 1 deletion src/tools/curve_tool_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ DECLARE_string(mdsDummyPort);
DECLARE_string(etcdAddr);
DECLARE_uint64(rpcTimeout);
DECLARE_uint64(rpcRetryTimes);
DECLARE_uint64(rpcConcurrentNum);
DECLARE_string(snapshotCloneAddr);
DECLARE_string(snapshotCloneDummyPort);
DECLARE_uint64(chunkSize);
Expand Down
4 changes: 0 additions & 4 deletions src/tools/curve_tool_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ void UpdateFlagsFromConf(curve::common::Configuration* conf) {
if (GetCommandLineFlagInfo("rpcRetryTimes", &info) && info.is_default) {
conf->GetUInt64Value("rpcRetryTimes", &FLAGS_rpcRetryTimes);
}
if (GetCommandLineFlagInfo("rpcConcurrentNum", &info) &&
info.is_default) {
conf->GetUInt64Value("rpcConcurrentNum", &FLAGS_rpcConcurrentNum);
}
if (GetCommandLineFlagInfo("snapshotCloneAddr", &info) &&
info.is_default) {
conf->GetStringValue("snapshotCloneAddr", &FLAGS_snapshotCloneAddr);
Expand Down

0 comments on commit b844f4f

Please sign in to comment.