Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs client : fix core dump of DataCache::Flush & fix rpc timeout … #1193

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ MetaStatusCode MetaServerClientImpl::GetDentry(uint32_t fsId, uint64_t inodeid,
return -1;
}

VLOG(6) << "GetDentry success, request: " << request.DebugString()
VLOG(6) << "GetDentry done, request: " << request.DebugString()
<< "response: " << response.ShortDebugString();
return ret;
};
Expand Down Expand Up @@ -208,7 +208,7 @@ MetaStatusCode MetaServerClientImpl::ListDentry(uint32_t fsId, uint64_t inodeid,
return -1;
}

VLOG(6) << "ListDentry success, request: " << request.DebugString()
VLOG(6) << "ListDentry done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return ret;
};
Expand Down Expand Up @@ -328,7 +328,7 @@ MetaStatusCode MetaServerClientImpl::DeleteDentry(uint32_t fsId,
return -1;
}

VLOG(6) << "DeleteDentry success, request: " << request.DebugString()
VLOG(6) << "DeleteDentry done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return ret;
};
Expand Down Expand Up @@ -377,7 +377,7 @@ MetaServerClientImpl::PrepareRenameTx(const std::vector<Dentry> &dentrys) {
return -1;
}

VLOG(6) << "PrepareRenameTx success, request: " << request.DebugString()
VLOG(6) << "PrepareRenameTx done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return rc;
};
Expand Down Expand Up @@ -681,7 +681,7 @@ MetaServerClientImpl::UpdateInode(const Inode &inode,
return -1;
}

VLOG(6) << "UpdateInode success, request: " << request.DebugString()
VLOG(6) << "UpdateInode done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return ret;
};
Expand Down Expand Up @@ -736,7 +736,7 @@ void UpdateInodeRpcDone::Run() {
return;
}

VLOG(6) << "UpdateInode success, "
VLOG(6) << "UpdateInode done, "
<< "response: " << response.DebugString();
done_->SetRetCode(ret);
return;
Expand Down Expand Up @@ -784,13 +784,7 @@ void MetaServerClientImpl::UpdateInodeAsync(
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
excutor->DoAsyncRPCTask(taskDone);
}

void MetaServerClientImpl::UpdateXattrAsync(const Inode &inode,
Expand Down Expand Up @@ -822,13 +816,7 @@ void MetaServerClientImpl::UpdateXattrAsync(const Inode &inode,
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
excutor->DoAsyncRPCTask(taskDone);
}

MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo(
Expand Down Expand Up @@ -884,7 +872,7 @@ MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo(
<< response.DebugString();
return -1;
}
VLOG(6) << "GetOrModifyS3ChunkInfo success, request: "
VLOG(6) << "GetOrModifyS3ChunkInfo done, request: "
<< request.DebugString()
<< "response: " << response.DebugString();
return ret;
Expand Down Expand Up @@ -946,7 +934,7 @@ void GetOrModifyS3ChunkInfoRpcDone::Run() {
done_->SetRetCode(-1);
return;
}
VLOG(6) << "GetOrModifyS3ChunkInfo success, response: "
VLOG(6) << "GetOrModifyS3ChunkInfo done, response: "
<< response.DebugString();
done_->SetRetCode(ret);
return;
Expand Down Expand Up @@ -984,13 +972,7 @@ void MetaServerClientImpl::GetOrModifyS3ChunkInfoAsync(
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
excutor->DoAsyncRPCTask(taskDone);
}

MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam &param,
Expand Down Expand Up @@ -1044,7 +1026,7 @@ MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam &param,
return -1;
}

VLOG(6) << "CreateInode success, request: " << request.DebugString()
VLOG(6) << "CreateInode done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return ret;
};
Expand Down Expand Up @@ -1094,7 +1076,7 @@ MetaStatusCode MetaServerClientImpl::DeleteInode(uint32_t fsId,
return -1;
}

VLOG(6) << "DeleteInode success, request: " << request.DebugString()
VLOG(6) << "DeleteInode done, request: " << request.DebugString()
<< "response: " << response.DebugString();
return ret;
};
Expand Down
55 changes: 19 additions & 36 deletions curvefs/src/client/rpcclient/task_excutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <algorithm>
#include <utility>

#include "curvefs/src/client/rpcclient/task_excutor.h"
#include "curvefs/proto/metaserver.pb.h"

Expand All @@ -41,7 +42,23 @@ MetaStatusCode ConvertToMetaStatusCode(int retcode) {

int TaskExecutor::DoRPCTask() {
task_->rpcTimeoutMs = opt_.rpcTimeoutMS;
return DoRPCTaskInner(nullptr);
}

void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
brpc::ClosureGuard done_guard(done);

task_->rpcTimeoutMs = opt_.rpcTimeoutMS;
int ret = DoRPCTaskInner(done);
if (ret < 0) {
done->SetRetCode(ret);
return;
}
done_guard.release();
return;
}

int TaskExecutor::DoRPCTaskInner(TaskExecutorDone *done) {
int retCode = -1;
bool needRetry = true;

Expand All @@ -68,7 +85,7 @@ int TaskExecutor::DoRPCTask() {
continue;
}

retCode = ExcuteTask(channel.get(), nullptr);
retCode = ExcuteTask(channel.get(), done);
needRetry = OnReturn(retCode);

if (needRetry) {
Expand All @@ -79,40 +96,6 @@ int TaskExecutor::DoRPCTask() {
return retCode;
}

int TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
task_->rpcTimeoutMs = opt_.rpcTimeoutMS;

int retCode = -1;

do {
if (task_->retryTimes++ > opt_.maxRetry) {
LOG(ERROR) << task_->TaskContextStr()
<< " retry times exceeds the limit";
break;
}

if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}

auto channel = channelManager_->GetOrCreateChannel(
task_->target.metaServerID, task_->target.endPoint);
if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}
retCode = ExcuteTask(channel.get(), done);
break;
} while (true);

return retCode;
}

bool TaskExecutor::OnReturn(int retCode) {
bool needRetry = false;

Expand Down Expand Up @@ -304,7 +287,7 @@ void TaskExecutorDone::Run() {
needRetry = excutor_->OnReturn(code_);
if (needRetry) {
excutor_->PreProcessBeforeRetry(code_);
code_ = excutor_->DoAsyncRPCTask(this);
code_ = excutor_->DoRPCTaskInner(this);
if (code_ < 0) {
done_->SetMetaStatusCode(ConvertToMetaStatusCode(code_));
return;
Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/rpcclient/task_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class TaskExecutor {
}

int DoRPCTask();
int DoAsyncRPCTask(TaskExecutorDone *done);
void DoAsyncRPCTask(TaskExecutorDone *done);
int DoRPCTaskInner(TaskExecutorDone *done);

bool OnReturn(int retCode);
void PreProcessBeforeRetry(int retCode);
Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/client/s3/disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ int DiskCacheWrite::UploadFile(const std::string &name,
}
VLOG(9) << "async upload start, file = " << name;
PutObjectAsyncCallBack cb =
[&, buffer,
syncTask](const std::shared_ptr<PutObjectAsyncContext> &context) {
[&, buffer, syncTask, name]
(const std::shared_ptr<PutObjectAsyncContext> &context) {
if (context->retCode == 0) {
if (metric_.get() != nullptr) {
metric_->writeS3.bps.count << context->bufferSize;
Expand Down