Skip to content

Commit

Permalink
tools: support scan-status command and imporve status command
Browse files Browse the repository at this point in the history
  • Loading branch information
Wine93 authored and ilixiaocui committed Jul 9, 2021
1 parent dc58284 commit 3ef185b
Show file tree
Hide file tree
Showing 29 changed files with 523 additions and 106 deletions.
7 changes: 3 additions & 4 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ proto_library(
"chunkserver.proto",
"curve_storage.proto",
]),
deps = [":common_proto",
":scan_proto"],
deps = [":common_proto", ":scan_proto"],
visibility = ["//visibility:public"],
)

Expand Down Expand Up @@ -98,10 +97,10 @@ proto_library(
cc_proto_library(
name = "scan_cc_proto",
visibility = ["//visibility:public"],
deps = [":scan_proto"],
deps = [":scan_proto"],
)

proto_library(
name = "scan_proto",
srcs = ["scan.proto"],
srcs = ["scan.proto"],
)
4 changes: 4 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ message Peer {
message CopysetInfo {
required uint32 logicalPoolId = 1;
required uint32 copysetId = 2;
optional bool scaning = 3;
optional uint64 lastScanSec = 4;
optional bool lastScanConsistent = 5;
}

4 changes: 2 additions & 2 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message CopySetInfo {
optional bool scaning = 8;
// timestamp for last success scan (seconds)
optional uint64 lastScanSec = 9;
// failed crc check scanmap
// the detail information for inconsistent copyset
repeated chunkserver.ScanMap scanMap = 10;
};

Expand Down Expand Up @@ -118,7 +118,7 @@ enum ConfigChangeType {
NONE = 4;
// 配置变更命令:change复制组一个成员
CHANGE_PEER = 5;
// start scan on the peer
// start scan on the peer
START_SCAN_PEER = 6;
// cancel scan on the peer
CANCEL_SCAN_PEER = 7;
Expand Down
16 changes: 15 additions & 1 deletion proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message CopysetData {
repeated uint32 chunkserverIds = 4;
optional bool availFlag = 5;
optional uint64 lastScanSec = 6;
optional bool lastScanConsistent = 7;
}

// rpc struct defination
Expand Down Expand Up @@ -455,13 +456,25 @@ message GetClusterInfoResponse {
}

message GetCopySetsInClusterRequest {
// Filter copysets which in scaning
optional bool filterScaning = 1;
}

message GetCopySetsInClusterResponse {
required sint32 statusCode = 1;
repeated common.CopysetInfo copysetInfos = 2;
}

message GetCopysetRequest {
required uint32 logicalPoolId = 1;
required uint32 copysetId = 2;
}

message GetCopysetResponse {
required sint32 statusCode = 1;
optional common.CopysetInfo copysetInfo = 2;
}

message SetCopysetsAvailFlagRequest {
required bool availFlag = 1;
repeated common.CopysetInfo copysets = 2;
Expand Down Expand Up @@ -508,11 +521,12 @@ service TopologyService {
rpc GetLogicalPool(GetLogicalPoolRequest) returns (GetLogicalPoolResponse);
rpc ListLogicalPool(ListLogicalPoolRequest) returns (ListLogicalPoolResponse);
rpc SetLogicalPool(SetLogicalPoolRequest) returns(SetLogicalPoolResponse);
rpc SetLogicalPoolScanState(SetLogicalPoolScanStateRequest) returns(SetLogicalPoolScanStateResponse);
rpc SetLogicalPoolScanState(SetLogicalPoolScanStateRequest) returns (SetLogicalPoolScanStateResponse);

rpc GetChunkServerListInCopySets(GetChunkServerListInCopySetsRequest) returns (GetChunkServerListInCopySetsResponse);
rpc GetCopySetsInChunkServer(GetCopySetsInChunkServerRequest) returns (GetCopySetsInChunkServerResponse);
rpc GetCopySetsInCluster(GetCopySetsInClusterRequest) returns (GetCopySetsInClusterResponse);
rpc GetCopyset(GetCopysetRequest) returns (GetCopysetResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse);
rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse);
Expand Down
3 changes: 2 additions & 1 deletion src/mds/heartbeat/heartbeat_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,10 @@ bool HeartbeatManager::FromHeartbeatCopySetInfoToTopologyOne(
topoCopysetInfo.SetScaning(info.scaning());
}

// set last scan
// set last scan and last scan consistent
if (info.has_lastscansec()) {
topoCopysetInfo.SetLastScanSec(info.lastscansec());
topoCopysetInfo.SetLastScanConsistent(info.scanmap_size() == 0);
}

*out = topoCopysetInfo;
Expand Down
4 changes: 3 additions & 1 deletion src/mds/heartbeat/topo_updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ void TopoUpdater::UpdateTopo(const CopySetInfo &reportCopySetInfo) {
<< reportCopySetInfo.GetLogicalPoolId()
<< "," << reportCopySetInfo.GetId()
<< "): scaning=" << reportCopySetInfo.GetScaning()
<< ", lastScanSec=" << reportCopySetInfo.GetLastScanSec();
<< ", lastScanSec=" << reportCopySetInfo.GetLastScanSec()
<< ", lastScanConsistent="
<< reportCopySetInfo.GetLastScanConsistent();
}
} else if (recordCopySetInfo.GetEpoch() > reportCopySetInfo.GetEpoch()) {
if (recordCopySetInfo.GetLeader() != reportCopySetInfo.GetLeader()) {
Expand Down
1 change: 1 addition & 0 deletions src/mds/topology/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ int TopologyImpl::UpdateCopySetTopo(const CopySetInfo &data) {

if (data.IsLatestLastScanSec(it->second.GetLastScanSec())) {
it->second.SetLastScanSec(data.GetLastScanSec());
it->second.SetLastScanConsistent(data.GetLastScanConsistent());
}

it->second.SetDirtyFlag(true);
Expand Down
3 changes: 3 additions & 0 deletions src/mds/topology/topology_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ bool CopySetInfo::SerializeToString(std::string *value) const {
}
data.set_availflag(available_);
data.set_lastscansec(lastScanSec_);
data.set_lastscanconsistent(lastScanConsistent_);
return data.SerializeToString(value);
}

Expand All @@ -354,6 +355,8 @@ bool CopySetInfo::ParseFromString(const std::string &value) {
peers_.insert(data.chunkserverids(i));
}
lastScanSec_ = data.has_lastscansec() ? data.lastscansec() : 0;
lastScanConsistent_ = data.has_lastscanconsistent() ?
data.lastscanconsistent() : true;
return ret;
}

Expand Down
15 changes: 15 additions & 0 deletions src/mds/topology/topology_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ class CopySetInfo {
hasScaning_(false),
lastScanSec_(0),
hasLastScanSec_(false),
lastScanConsistent_(true),
dirty_(false),
available_(true) {}

Expand All @@ -702,6 +703,7 @@ class CopySetInfo {
hasScaning_(false),
lastScanSec_(0),
hasLastScanSec_(false),
lastScanConsistent_(true),
dirty_(false),
available_(true) {}

Expand All @@ -717,6 +719,7 @@ class CopySetInfo {
hasScaning_(v.hasScaning_),
lastScanSec_(v.lastScanSec_),
hasLastScanSec_(v.hasLastScanSec_),
lastScanConsistent_(v.lastScanConsistent_),
dirty_(v.dirty_),
available_(v.available_) {}

Expand All @@ -735,6 +738,7 @@ class CopySetInfo {
hasScaning_ = v.hasScaning_;
lastScanSec_ = v.lastScanSec_;
hasLastScanSec_ = v.hasLastScanSec_;
lastScanConsistent_ = v.lastScanConsistent_;
dirty_ = v.dirty_;
available_ = v.available_;
return *this;
Expand Down Expand Up @@ -827,6 +831,14 @@ class CopySetInfo {
return lastScanSec_;
}

bool SetLastScanConsistent(bool lastScanConsistent) {
lastScanConsistent_ = lastScanConsistent;
}

bool GetLastScanConsistent() const {
return lastScanConsistent_;
}

void ClearCandidate() {
hasCandidate_ = false;
}
Expand Down Expand Up @@ -876,6 +888,9 @@ class CopySetInfo {
// whether the lastScanSec_ has been set
bool hasLastScanSec_;

// whether the data of copies is consistent for last scan
bool lastScanConsistent_;

/**
* @brief to mark whether data is dirty, for writing to storage regularly
*/
Expand Down
71 changes: 50 additions & 21 deletions src/mds/topology/topology_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,31 +839,60 @@ void TopologyServiceImpl::GetCopySetsInChunkServer(
}

void TopologyServiceImpl::GetCopySetsInCluster(
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done) {
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

auto localAddr = cntl->local_side();
auto remoteAddr = cntl->remote_side();
LOG(INFO)
<< "Received request[log_id=" << cntl->log_id()
<< "] from " << remoteAddr << " to " << localAddr
<< ". [GetCopySetsInCluster] " << request->DebugString();

LOG(INFO) << "Received request[log_id=" << cntl->log_id()
<< "] from " << cntl->remote_side()
<< " to " << cntl->local_side()
<< ". [GetCopySetsInClusterRequest]";
topology_->GetCopySetsInCluster(request, response);
if (kTopoErrCodeSuccess != response->statuscode()) {
LOG(ERROR) << "Send response[log_id=" << cntl->log_id()
<< "] from " << cntl->local_side()
<< " to " << cntl->remote_side()
<< ". [GetCopySetsInClusterResponse] "
<< response->DebugString();

std::ostringstream errMsg;
errMsg << "Send response[log_id=" << cntl->log_id()
<< "] from " << localAddr << " to " << remoteAddr
<< ". [GetCopySetsInCluster] " << response->DebugString();

if (response->statuscode() == kTopoErrCodeSuccess) {
LOG(INFO) << errMsg.str();
} else {
LOG(INFO) << "Send response[log_id=" << cntl->log_id()
<< "] from " << cntl->local_side()
<< " to " << cntl->remote_side()
<< ". [GetCopySetsInClusterResponse] copyset num: "
<< response->copysetinfos_size();
LOG(ERROR) << errMsg.str();
}
}

void TopologyServiceImpl::GetCopyset(
google::protobuf::RpcController* cntl_base,
const GetCopysetRequest* request,
GetCopysetResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

auto localAddr = cntl->local_side();
auto remoteAddr = cntl->remote_side();
LOG(INFO)
<< "Received request[log_id=" << cntl->log_id()
<< "] from " << remoteAddr << " to " << localAddr
<< ". [GetCopyset] " << request->DebugString();

topology_->GetCopyset(request, response);

std::ostringstream errMsg;
errMsg << "Send response[log_id=" << cntl->log_id()
<< "] from " << localAddr << " to " << remoteAddr
<< ". [GetCopyset] " << response->DebugString();

if (response->statuscode() == kTopoErrCodeSuccess) {
LOG(INFO) << errMsg.str();
} else {
LOG(ERROR) << errMsg.str();
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/mds/topology/topology_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,15 @@ class TopologyServiceImpl : public TopologyService {
google::protobuf::Closure* done);

virtual void GetCopySetsInCluster(
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done);
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done);

virtual void GetCopyset(google::protobuf::RpcController* cntl_base,
const GetCopysetRequest* request,
GetCopysetResponse* response,
google::protobuf::Closure* done);

virtual void GetClusterInfo(
google::protobuf::RpcController* cntl_base,
Expand Down
55 changes: 47 additions & 8 deletions src/mds/topology/topology_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <vector>
#include <chrono> //NOLINT
#include <thread> //NOLINT
#include <algorithm>

#include "brpc/channel.h"
#include "brpc/controller.h"
Expand Down Expand Up @@ -1399,16 +1400,45 @@ void TopologyServiceManager::GetCopySetsInChunkServer(
}

void TopologyServiceManager::GetCopySetsInCluster(
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response) {
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response) {
auto filter = [&](const CopySetInfo& copysetInfo) {
if (request->has_filterscaning() && !copysetInfo.GetScaning()) {
return false;
}

return true;
};

auto logicalPoolIds = topology_->GetLogicalPoolInCluster();
std::sort(logicalPoolIds.begin(), logicalPoolIds.end());
for (const auto& lpid : logicalPoolIds) {
auto copysetInfos =
topology_-> GetCopySetInfosInLogicalPool(lpid, filter);
for (auto& copysetInfo : copysetInfos) {
CopysetInfo* info = response->add_copysetinfos();
ConvertCopyset(copysetInfo, info);
}
}

response->set_statuscode(kTopoErrCodeSuccess);
std::vector<CopySetKey> copysets =
topology_->GetCopySetsInCluster();
for (const CopySetKey& copyset : copysets) {
CopysetInfo *info = response->add_copysetinfos();
info->set_logicalpoolid(copyset.first);
info->set_copysetid(copyset.second);
}

void TopologyServiceManager::GetCopyset(const GetCopysetRequest* request,
GetCopysetResponse* response) {
CopySetInfo copysetInfo;
auto lpid = request->logicalpoolid();
auto copysetId = request->copysetid();
CopySetKey copysetKey(lpid, copysetId);

if (!topology_->GetCopySet(copysetKey, &copysetInfo)) {
response->set_statuscode(kTopoErrCodeCopySetNotFound);
return;
}

response->set_statuscode(kTopoErrCodeSuccess);
auto info = response->mutable_copysetinfo();
ConvertCopyset(copysetInfo, info);
}

void TopologyServiceManager::GetClusterInfo(
Expand Down Expand Up @@ -1458,6 +1488,15 @@ void TopologyServiceManager::ListUnAvailCopySets(
response->set_statuscode(kTopoErrCodeSuccess);
}

void TopologyServiceManager::ConvertCopyset(const CopySetInfo& in,
::curve::common::CopysetInfo* out) {
out->set_logicalpoolid(in.GetLogicalPoolId());
out->set_copysetid(in.GetId());
out->set_scaning(in.GetScaning());
out->set_lastscansec(in.GetLastScanSec());
out->set_lastscanconsistent(in.GetLastScanConsistent());
}

} // namespace topology
} // namespace mds
} // namespace curve
Loading

0 comments on commit 3ef185b

Please sign in to comment.