Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: support scan-status command and imporve status command #420

Merged
merged 1 commit into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ proto_library(
"chunkserver.proto",
"curve_storage.proto",
]),
deps = [":common_proto",
":scan_proto"],
deps = [":common_proto", ":scan_proto"],
visibility = ["//visibility:public"],
)

Expand Down Expand Up @@ -98,10 +97,10 @@ proto_library(
cc_proto_library(
name = "scan_cc_proto",
visibility = ["//visibility:public"],
deps = [":scan_proto"],
deps = [":scan_proto"],
)

proto_library(
name = "scan_proto",
srcs = ["scan.proto"],
srcs = ["scan.proto"],
)
4 changes: 4 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ message Peer {
message CopysetInfo {
required uint32 logicalPoolId = 1;
required uint32 copysetId = 2;
optional bool scaning = 3;
optional uint64 lastScanSec = 4;
optional bool lastScanConsistent = 5;
}

4 changes: 2 additions & 2 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message CopySetInfo {
optional bool scaning = 8;
// timestamp for last success scan (seconds)
optional uint64 lastScanSec = 9;
// failed crc check scanmap
// the detail information for inconsistent copyset
repeated chunkserver.ScanMap scanMap = 10;
};

Expand Down Expand Up @@ -118,7 +118,7 @@ enum ConfigChangeType {
NONE = 4;
// 配置变更命令:change复制组一个成员
CHANGE_PEER = 5;
// start scan on the peer
// start scan on the peer
START_SCAN_PEER = 6;
// cancel scan on the peer
CANCEL_SCAN_PEER = 7;
Expand Down
16 changes: 15 additions & 1 deletion proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message CopysetData {
repeated uint32 chunkserverIds = 4;
optional bool availFlag = 5;
optional uint64 lastScanSec = 6;
optional bool lastScanConsistent = 7;
}

// rpc struct defination
Expand Down Expand Up @@ -455,13 +456,25 @@ message GetClusterInfoResponse {
}

message GetCopySetsInClusterRequest {
// Filter copysets which in scaning
optional bool filterScaning = 1;
}

message GetCopySetsInClusterResponse {
required sint32 statusCode = 1;
repeated common.CopysetInfo copysetInfos = 2;
}

message GetCopysetRequest {
required uint32 logicalPoolId = 1;
required uint32 copysetId = 2;
}

message GetCopysetResponse {
required sint32 statusCode = 1;
optional common.CopysetInfo copysetInfo = 2;
}

message SetCopysetsAvailFlagRequest {
required bool availFlag = 1;
repeated common.CopysetInfo copysets = 2;
Expand Down Expand Up @@ -508,11 +521,12 @@ service TopologyService {
rpc GetLogicalPool(GetLogicalPoolRequest) returns (GetLogicalPoolResponse);
rpc ListLogicalPool(ListLogicalPoolRequest) returns (ListLogicalPoolResponse);
rpc SetLogicalPool(SetLogicalPoolRequest) returns(SetLogicalPoolResponse);
rpc SetLogicalPoolScanState(SetLogicalPoolScanStateRequest) returns(SetLogicalPoolScanStateResponse);
rpc SetLogicalPoolScanState(SetLogicalPoolScanStateRequest) returns (SetLogicalPoolScanStateResponse);

rpc GetChunkServerListInCopySets(GetChunkServerListInCopySetsRequest) returns (GetChunkServerListInCopySetsResponse);
rpc GetCopySetsInChunkServer(GetCopySetsInChunkServerRequest) returns (GetCopySetsInChunkServerResponse);
rpc GetCopySetsInCluster(GetCopySetsInClusterRequest) returns (GetCopySetsInClusterResponse);
rpc GetCopyset(GetCopysetRequest) returns (GetCopysetResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse);
rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse);
Expand Down
3 changes: 2 additions & 1 deletion src/mds/heartbeat/heartbeat_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,10 @@ bool HeartbeatManager::FromHeartbeatCopySetInfoToTopologyOne(
topoCopysetInfo.SetScaning(info.scaning());
}

// set last scan
// set last scan and last scan consistent
if (info.has_lastscansec()) {
topoCopysetInfo.SetLastScanSec(info.lastscansec());
topoCopysetInfo.SetLastScanConsistent(info.scanmap_size() == 0);
}

*out = topoCopysetInfo;
Expand Down
4 changes: 3 additions & 1 deletion src/mds/heartbeat/topo_updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ void TopoUpdater::UpdateTopo(const CopySetInfo &reportCopySetInfo) {
<< reportCopySetInfo.GetLogicalPoolId()
<< "," << reportCopySetInfo.GetId()
<< "): scaning=" << reportCopySetInfo.GetScaning()
<< ", lastScanSec=" << reportCopySetInfo.GetLastScanSec();
<< ", lastScanSec=" << reportCopySetInfo.GetLastScanSec()
<< ", lastScanConsistent="
<< reportCopySetInfo.GetLastScanConsistent();
}
} else if (recordCopySetInfo.GetEpoch() > reportCopySetInfo.GetEpoch()) {
if (recordCopySetInfo.GetLeader() != reportCopySetInfo.GetLeader()) {
Expand Down
1 change: 1 addition & 0 deletions src/mds/topology/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ int TopologyImpl::UpdateCopySetTopo(const CopySetInfo &data) {

if (data.IsLatestLastScanSec(it->second.GetLastScanSec())) {
it->second.SetLastScanSec(data.GetLastScanSec());
it->second.SetLastScanConsistent(data.GetLastScanConsistent());
}

it->second.SetDirtyFlag(true);
Expand Down
3 changes: 3 additions & 0 deletions src/mds/topology/topology_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ bool CopySetInfo::SerializeToString(std::string *value) const {
}
data.set_availflag(available_);
data.set_lastscansec(lastScanSec_);
data.set_lastscanconsistent(lastScanConsistent_);
return data.SerializeToString(value);
}

Expand All @@ -354,6 +355,8 @@ bool CopySetInfo::ParseFromString(const std::string &value) {
peers_.insert(data.chunkserverids(i));
}
lastScanSec_ = data.has_lastscansec() ? data.lastscansec() : 0;
lastScanConsistent_ = data.has_lastscanconsistent() ?
data.lastscanconsistent() : true;
return ret;
}

Expand Down
15 changes: 15 additions & 0 deletions src/mds/topology/topology_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ class CopySetInfo {
hasScaning_(false),
lastScanSec_(0),
hasLastScanSec_(false),
lastScanConsistent_(true),
dirty_(false),
available_(true) {}

Expand All @@ -702,6 +703,7 @@ class CopySetInfo {
hasScaning_(false),
lastScanSec_(0),
hasLastScanSec_(false),
lastScanConsistent_(true),
dirty_(false),
available_(true) {}

Expand All @@ -717,6 +719,7 @@ class CopySetInfo {
hasScaning_(v.hasScaning_),
lastScanSec_(v.lastScanSec_),
hasLastScanSec_(v.hasLastScanSec_),
lastScanConsistent_(v.lastScanConsistent_),
dirty_(v.dirty_),
available_(v.available_) {}

Expand All @@ -735,6 +738,7 @@ class CopySetInfo {
hasScaning_ = v.hasScaning_;
lastScanSec_ = v.lastScanSec_;
hasLastScanSec_ = v.hasLastScanSec_;
lastScanConsistent_ = v.lastScanConsistent_;
dirty_ = v.dirty_;
available_ = v.available_;
return *this;
Expand Down Expand Up @@ -827,6 +831,14 @@ class CopySetInfo {
return lastScanSec_;
}

bool SetLastScanConsistent(bool lastScanConsistent) {
lastScanConsistent_ = lastScanConsistent;
}

bool GetLastScanConsistent() const {
return lastScanConsistent_;
}

void ClearCandidate() {
hasCandidate_ = false;
}
Expand Down Expand Up @@ -876,6 +888,9 @@ class CopySetInfo {
// whether the lastScanSec_ has been set
bool hasLastScanSec_;

// whether the data of copies is consistent for last scan
bool lastScanConsistent_;

/**
* @brief to mark whether data is dirty, for writing to storage regularly
*/
Expand Down
71 changes: 50 additions & 21 deletions src/mds/topology/topology_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,31 +839,60 @@ void TopologyServiceImpl::GetCopySetsInChunkServer(
}

void TopologyServiceImpl::GetCopySetsInCluster(
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done) {
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

auto localAddr = cntl->local_side();
auto remoteAddr = cntl->remote_side();
LOG(INFO)
<< "Received request[log_id=" << cntl->log_id()
<< "] from " << remoteAddr << " to " << localAddr
<< ". [GetCopySetsInCluster] " << request->DebugString();

LOG(INFO) << "Received request[log_id=" << cntl->log_id()
<< "] from " << cntl->remote_side()
<< " to " << cntl->local_side()
<< ". [GetCopySetsInClusterRequest]";
topology_->GetCopySetsInCluster(request, response);
if (kTopoErrCodeSuccess != response->statuscode()) {
LOG(ERROR) << "Send response[log_id=" << cntl->log_id()
<< "] from " << cntl->local_side()
<< " to " << cntl->remote_side()
<< ". [GetCopySetsInClusterResponse] "
<< response->DebugString();

std::ostringstream errMsg;
errMsg << "Send response[log_id=" << cntl->log_id()
<< "] from " << localAddr << " to " << remoteAddr
<< ". [GetCopySetsInCluster] " << response->DebugString();

if (response->statuscode() == kTopoErrCodeSuccess) {
LOG(INFO) << errMsg.str();
} else {
LOG(INFO) << "Send response[log_id=" << cntl->log_id()
<< "] from " << cntl->local_side()
<< " to " << cntl->remote_side()
<< ". [GetCopySetsInClusterResponse] copyset num: "
<< response->copysetinfos_size();
LOG(ERROR) << errMsg.str();
}
}

void TopologyServiceImpl::GetCopyset(
google::protobuf::RpcController* cntl_base,
const GetCopysetRequest* request,
GetCopysetResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

auto localAddr = cntl->local_side();
auto remoteAddr = cntl->remote_side();
LOG(INFO)
<< "Received request[log_id=" << cntl->log_id()
<< "] from " << remoteAddr << " to " << localAddr
<< ". [GetCopyset] " << request->DebugString();

topology_->GetCopyset(request, response);

std::ostringstream errMsg;
errMsg << "Send response[log_id=" << cntl->log_id()
<< "] from " << localAddr << " to " << remoteAddr
<< ". [GetCopyset] " << response->DebugString();

if (response->statuscode() == kTopoErrCodeSuccess) {
LOG(INFO) << errMsg.str();
} else {
LOG(ERROR) << errMsg.str();
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/mds/topology/topology_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,15 @@ class TopologyServiceImpl : public TopologyService {
google::protobuf::Closure* done);

virtual void GetCopySetsInCluster(
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done);
google::protobuf::RpcController* cntl_base,
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response,
google::protobuf::Closure* done);

virtual void GetCopyset(google::protobuf::RpcController* cntl_base,
const GetCopysetRequest* request,
GetCopysetResponse* response,
google::protobuf::Closure* done);

virtual void GetClusterInfo(
google::protobuf::RpcController* cntl_base,
Expand Down
55 changes: 47 additions & 8 deletions src/mds/topology/topology_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <vector>
#include <chrono> //NOLINT
#include <thread> //NOLINT
#include <algorithm>

#include "brpc/channel.h"
#include "brpc/controller.h"
Expand Down Expand Up @@ -1399,16 +1400,45 @@ void TopologyServiceManager::GetCopySetsInChunkServer(
}

void TopologyServiceManager::GetCopySetsInCluster(
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response) {
const GetCopySetsInClusterRequest* request,
GetCopySetsInClusterResponse* response) {
auto filter = [&](const CopySetInfo& copysetInfo) {
if (request->has_filterscaning() && !copysetInfo.GetScaning()) {
return false;
}

return true;
};

auto logicalPoolIds = topology_->GetLogicalPoolInCluster();
std::sort(logicalPoolIds.begin(), logicalPoolIds.end());
for (const auto& lpid : logicalPoolIds) {
auto copysetInfos =
topology_-> GetCopySetInfosInLogicalPool(lpid, filter);
for (auto& copysetInfo : copysetInfos) {
CopysetInfo* info = response->add_copysetinfos();
ConvertCopyset(copysetInfo, info);
}
}

response->set_statuscode(kTopoErrCodeSuccess);
std::vector<CopySetKey> copysets =
topology_->GetCopySetsInCluster();
for (const CopySetKey& copyset : copysets) {
CopysetInfo *info = response->add_copysetinfos();
info->set_logicalpoolid(copyset.first);
info->set_copysetid(copyset.second);
}

void TopologyServiceManager::GetCopyset(const GetCopysetRequest* request,
GetCopysetResponse* response) {
CopySetInfo copysetInfo;
auto lpid = request->logicalpoolid();
auto copysetId = request->copysetid();
CopySetKey copysetKey(lpid, copysetId);

if (!topology_->GetCopySet(copysetKey, &copysetInfo)) {
response->set_statuscode(kTopoErrCodeCopySetNotFound);
return;
}

response->set_statuscode(kTopoErrCodeSuccess);
auto info = response->mutable_copysetinfo();
ConvertCopyset(copysetInfo, info);
}

void TopologyServiceManager::GetClusterInfo(
Expand Down Expand Up @@ -1458,6 +1488,15 @@ void TopologyServiceManager::ListUnAvailCopySets(
response->set_statuscode(kTopoErrCodeSuccess);
}

void TopologyServiceManager::ConvertCopyset(const CopySetInfo& in,
::curve::common::CopysetInfo* out) {
out->set_logicalpoolid(in.GetLogicalPoolId());
out->set_copysetid(in.GetId());
out->set_scaning(in.GetScaning());
out->set_lastscansec(in.GetLastScanSec());
out->set_lastscanconsistent(in.GetLastScanConsistent());
}

} // namespace topology
} // namespace mds
} // namespace curve
Loading