Skip to content

Commit

Permalink
curvefs/client: fix the data iteration error when rpc retry.
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed May 23, 2022
1 parent 91bc587 commit 01e3745
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 156 deletions.
4 changes: 2 additions & 2 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr(
return CURVEFS_ERROR::OK;
}

MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, inodeIds, attr);
MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, *inodeIds, attr);
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetInodeAttr failed, MetaStatusCode = "
<< ret << ", MetaStatusCode_Name = "
Expand Down Expand Up @@ -150,7 +150,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetXAttr(
return CURVEFS_ERROR::OK;
}

MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, inodeIds, xattr);
MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, *inodeIds, xattr);
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetXAttr failed, MetaStatusCode = "
<< ret << ", MetaStatusCode_Name = "
Expand Down
295 changes: 157 additions & 138 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,14 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
bool GroupInodeIdByPartition(
uint32_t fsId,
std::shared_ptr<MetaCache> metaCache,
std::set<uint64_t> *inodeIds,
std::unordered_map<uint32_t, std::list<uint64_t>> *inodeGroups) {
for (const auto &it : *inodeIds) {
const std::set<uint64_t> &inodeIds,
std::unordered_map<uint32_t, std::vector<uint64_t>> *inodeGroups) {
for (const auto &it : inodeIds) {
uint32_t pId = 0;
if (metaCache->GetPartitionIdByInodeId(fsId, it, &pId)) {
auto iter = inodeGroups->find(pId);
if (iter == inodeGroups->end()) {
inodeGroups->emplace(pId, std::list<uint64_t>({it}));
inodeGroups->emplace(pId, std::vector<uint64_t>({it}));
} else {
iter->second.push_back(it);
}
Expand All @@ -497,163 +497,182 @@ bool GroupInodeIdByPartition(
return true;
}

bool MetaServerClientImpl::SplitRequestInodes(
uint32_t fsId,
const std::set<uint64_t> &inodeIds,
std::vector<std::vector<uint64_t>> *inodeGroups) {
std::unordered_map<uint32_t, std::vector<uint64_t>> groups;
bool ret = GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &groups);
if (!ret) {
return false;
}
for (const auto &it : groups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
std::vector<uint64_t> tmp;
uint32_t batchLimit = opt_.batchLimit;
while (iter != it.second.end() && batchLimit > 0) {
tmp.emplace_back(*iter);
iter++;
batchLimit--;
}
inodeGroups->emplace_back(std::move(tmp));
}
}
return true;
}


MetaStatusCode MetaServerClientImpl::BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<InodeAttr> *attr) {
uint32_t limit = opt_.batchLimit;
// group inodeid by partition
std::unordered_map<uint32_t, std::list<uint64_t>> inodeGroups;
if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) {
// group inodeid by partition and batchlimit
std::vector<std::vector<uint64_t>> inodeGroups;
if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) {
return MetaStatusCode::NOT_FOUND;
}

// send rpc
// TDOD(wanghai): send rpc parallelly
for (const auto &it : inodeGroups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
uint64_t inodeId = *iter;
auto task = RPCTask {
metaserverClientMetric_->batchGetInodeAttr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetInodeAttr.latency);
BatchGetInodeAttrRequest request;
BatchGetInodeAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(CopysetGroupID(poolID,
copysetID)));
uint32_t batchLimit = limit;
while (iter != it.second.end() && batchLimit > 0) {
request.add_inodeid(*iter);
iter++;
batchLimit--;
}
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetInodeAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetInodeAttr.eps.count << 1;
LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}
if (it.empty()) {
LOG(WARNING) << "BatchGetInodeAttr request empty.";
return MetaStatusCode::PARAM_ERROR;
}
uint64_t inodeId = *it.begin();
auto task = RPCTask {
metaserverClientMetric_->batchGetInodeAttr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetInodeAttr.latency);
BatchGetInodeAttrRequest request;
BatchGetInodeAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(CopysetGroupID(poolID,
copysetID)));
*request.mutable_inodeid() = { it.begin(), it.end() };

curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetInodeAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetInodeAttr.eps.count << 1;
LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetInodeAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.attr_size() > 0 &&
response.has_appliedindex()) {
auto retAttr = response.attr();
for_each(retAttr.begin(), retAttr.end(),
[&](InodeAttr &a) { attr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetInodeAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
attr->clear();
return ret;
LOG(ERROR) << "BatchGetInodeAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.attr_size() > 0 &&
response.has_appliedindex()) {
auto *attrs = response.mutable_attr();
attr->insert(attr->end(),
std::make_move_iterator(attrs->begin()),
std::make_move_iterator(attrs->end()));
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetInodeAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
if (ret != MetaStatusCode::OK) {
attr->clear();
return ret;
}
}
return MetaStatusCode::OK;
}

MetaStatusCode MetaServerClientImpl::BatchGetXAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
const std::set<uint64_t> &inodeIds,
std::list<XAttr> *xattr) {
uint32_t limit = opt_.batchLimit;
// group inodeid by partition
std::unordered_map<uint32_t, std::list<uint64_t>> inodeGroups;
if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) {
// group inodeid by partition and batchlimit
std::vector<std::vector<uint64_t>> inodeGroups;
if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) {
return MetaStatusCode::NOT_FOUND;
}

// send rpc
// TDOD(wanghai): send rpc parallelly
for (const auto &it : inodeGroups) {
auto iter = it.second.begin();
while (iter != it.second.end()) {
uint64_t inodeId = *iter;
auto task = RPCTask {
metaserverClientMetric_->batchGetXattr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetXattr.latency);
BatchGetXAttrRequest request;
BatchGetXAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(
CopysetGroupID(poolID, copysetID)));
uint32_t batchLimit = limit;
while (iter != it.second.end() && batchLimit > 0) {
request.add_inodeid(*iter);
iter++;
batchLimit--;
}
curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetXAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetXattr.eps.count << 1;
LOG(WARNING) << "BatchGetXAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}
if (it.empty()) {
LOG(WARNING) << "BatchGetInodeXAttr request empty.";
return MetaStatusCode::PARAM_ERROR;
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND)
<< "BatchGetXAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.xattr_size() > 0 &&
response.has_appliedindex()) {
auto retXattr = response.xattr();
for_each(retXattr.begin(), retXattr.end(),
[&](XAttr &a) { xattr->push_back(a); });
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetXAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
uint64_t inodeId = *it.begin();
auto task = RPCTask {
metaserverClientMetric_->batchGetXattr.qps.count << 1;
LatencyUpdater updater(
&metaserverClientMetric_->batchGetXattr.latency);
BatchGetXAttrRequest request;
BatchGetXAttrResponse response;
request.set_poolid(poolID);
request.set_copysetid(copysetID);
request.set_partitionid(partitionID);
request.set_fsid(fsId);
request.set_appliedindex(
metaCache_->GetApplyIndex(
CopysetGroupID(poolID, copysetID)));
*request.mutable_inodeid() = { it.begin(), it.end() };

curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.BatchGetXAttr(cntl, &request, &response, nullptr);

if (cntl->Failed()) {
metaserverClientMetric_->batchGetXattr.eps.count << 1;
LOG(WARNING) << "BatchGetXAttr Failed, errorcode = "
<< cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

MetaStatusCode ret = response.statuscode();
if (ret != MetaStatusCode::OK) {
xattr->clear();
return ret;
LOG(ERROR) << "BatchGetXAttr failed, errcode = " << ret
<< ", errmsg = " << MetaStatusCode_Name(ret);
} else if (response.xattr_size() > 0 &&
response.has_appliedindex()) {
auto *xattrs = response.mutable_xattr();
xattr->insert(xattr->end(),
std::make_move_iterator(xattrs->begin()),
std::make_move_iterator(xattrs->end()));
metaCache_->UpdateApplyIndex(
CopysetGroupID(poolID, copysetID),
response.appliedindex());
} else {
LOG(WARNING) << "BatchGetXAttr ok, but"
<< " applyIndex or attr not set in response: "
<< response.DebugString();
return -1;
}
return ret;
};
auto taskCtx = std::make_shared<TaskContext>(
MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId);
BatchGetInodeAttrExcutor excutor(
opt_, metaCache_, channelManager_, taskCtx);
auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask());
if (ret != MetaStatusCode::OK) {
xattr->clear();
return ret;
}
}
return MetaStatusCode::OK;
Expand Down
Loading

0 comments on commit 01e3745

Please sign in to comment.