From 23a74c97fe375508155f20171ff56d8d8d42425d Mon Sep 17 00:00:00 2001 From: xuchaojie Date: Wed, 16 Mar 2022 19:15:41 +0800 Subject: [PATCH] curvefs client : fix core dump of DataCache::Flush & fix rpc timeout time not backoff --- .../client/rpcclient/metaserver_client.cpp | 44 +++++---------- curvefs/src/client/rpcclient/task_excutor.cpp | 55 +++++++------------ curvefs/src/client/rpcclient/task_excutor.h | 3 +- curvefs/src/client/s3/disk_cache_write.cpp | 4 +- 4 files changed, 36 insertions(+), 70 deletions(-) diff --git a/curvefs/src/client/rpcclient/metaserver_client.cpp b/curvefs/src/client/rpcclient/metaserver_client.cpp index c66ea81a5e..798cd99513 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.cpp +++ b/curvefs/src/client/rpcclient/metaserver_client.cpp @@ -140,7 +140,7 @@ MetaStatusCode MetaServerClientImpl::GetDentry(uint32_t fsId, uint64_t inodeid, return -1; } - VLOG(6) << "GetDentry success, request: " << request.DebugString() + VLOG(6) << "GetDentry done, request: " << request.DebugString() << "response: " << response.ShortDebugString(); return ret; }; @@ -208,7 +208,7 @@ MetaStatusCode MetaServerClientImpl::ListDentry(uint32_t fsId, uint64_t inodeid, return -1; } - VLOG(6) << "ListDentry success, request: " << request.DebugString() + VLOG(6) << "ListDentry done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; }; @@ -328,7 +328,7 @@ MetaStatusCode MetaServerClientImpl::DeleteDentry(uint32_t fsId, return -1; } - VLOG(6) << "DeleteDentry success, request: " << request.DebugString() + VLOG(6) << "DeleteDentry done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; }; @@ -377,7 +377,7 @@ MetaServerClientImpl::PrepareRenameTx(const std::vector &dentrys) { return -1; } - VLOG(6) << "PrepareRenameTx success, request: " << request.DebugString() + VLOG(6) << "PrepareRenameTx done, request: " << request.DebugString() << "response: " << response.DebugString(); return rc; }; @@ -681,7 +681,7 @@ MetaServerClientImpl::UpdateInode(const Inode &inode, return -1; } - VLOG(6) << "UpdateInode success, request: " << request.DebugString() + VLOG(6) << "UpdateInode done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; }; @@ -736,7 +736,7 @@ void UpdateInodeRpcDone::Run() { return; } - VLOG(6) << "UpdateInode success, " + VLOG(6) << "UpdateInode done, " << "response: " << response.DebugString(); done_->SetRetCode(ret); return; @@ -784,13 +784,7 @@ void MetaServerClientImpl::UpdateInodeAsync( metaCache_, channelManager_, taskCtx); TaskExecutorDone *taskDone = new TaskExecutorDone( excutor, done); - brpc::ClosureGuard taskDone_guard(taskDone); - int ret = excutor->DoAsyncRPCTask(taskDone); - if (ret < 0) { - taskDone->SetRetCode(ret); - return; - } - taskDone_guard.release(); + excutor->DoAsyncRPCTask(taskDone); } void MetaServerClientImpl::UpdateXattrAsync(const Inode &inode, @@ -822,13 +816,7 @@ void MetaServerClientImpl::UpdateXattrAsync(const Inode &inode, metaCache_, channelManager_, taskCtx); TaskExecutorDone *taskDone = new TaskExecutorDone( excutor, done); - brpc::ClosureGuard taskDone_guard(taskDone); - int ret = excutor->DoAsyncRPCTask(taskDone); - if (ret < 0) { - taskDone->SetRetCode(ret); - return; - } - taskDone_guard.release(); + excutor->DoAsyncRPCTask(taskDone); } MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo( @@ -884,7 +872,7 @@ MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo( << response.DebugString(); return -1; } - VLOG(6) << "GetOrModifyS3ChunkInfo success, request: " + VLOG(6) << "GetOrModifyS3ChunkInfo done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; @@ -946,7 +934,7 @@ void GetOrModifyS3ChunkInfoRpcDone::Run() { done_->SetRetCode(-1); return; } - VLOG(6) << "GetOrModifyS3ChunkInfo success, response: " + VLOG(6) << "GetOrModifyS3ChunkInfo done, response: " << response.DebugString(); done_->SetRetCode(ret); return; @@ -984,13 +972,7 @@ void MetaServerClientImpl::GetOrModifyS3ChunkInfoAsync( metaCache_, channelManager_, taskCtx); TaskExecutorDone *taskDone = new TaskExecutorDone( excutor, done); - brpc::ClosureGuard taskDone_guard(taskDone); - int ret = excutor->DoAsyncRPCTask(taskDone); - if (ret < 0) { - taskDone->SetRetCode(ret); - return; - } - taskDone_guard.release(); + excutor->DoAsyncRPCTask(taskDone); } MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam ¶m, @@ -1044,7 +1026,7 @@ MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam ¶m, return -1; } - VLOG(6) << "CreateInode success, request: " << request.DebugString() + VLOG(6) << "CreateInode done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; }; @@ -1094,7 +1076,7 @@ MetaStatusCode MetaServerClientImpl::DeleteInode(uint32_t fsId, return -1; } - VLOG(6) << "DeleteInode success, request: " << request.DebugString() + VLOG(6) << "DeleteInode done, request: " << request.DebugString() << "response: " << response.DebugString(); return ret; }; diff --git a/curvefs/src/client/rpcclient/task_excutor.cpp b/curvefs/src/client/rpcclient/task_excutor.cpp index f91e780048..0d3f9c7c7e 100644 --- a/curvefs/src/client/rpcclient/task_excutor.cpp +++ b/curvefs/src/client/rpcclient/task_excutor.cpp @@ -23,6 +23,7 @@ #include #include + #include "curvefs/src/client/rpcclient/task_excutor.h" #include "curvefs/proto/metaserver.pb.h" @@ -41,7 +42,23 @@ MetaStatusCode ConvertToMetaStatusCode(int retcode) { int TaskExecutor::DoRPCTask() { task_->rpcTimeoutMs = opt_.rpcTimeoutMS; + return DoRPCTaskInner(nullptr); +} + +void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) { + brpc::ClosureGuard done_guard(done); + + task_->rpcTimeoutMs = opt_.rpcTimeoutMS; + int ret = DoRPCTaskInner(done); + if (ret < 0) { + done->SetRetCode(ret); + return; + } + done_guard.release(); + return; +} +int TaskExecutor::DoRPCTaskInner(TaskExecutorDone *done) { int retCode = -1; bool needRetry = true; @@ -68,7 +85,7 @@ int TaskExecutor::DoRPCTask() { continue; } - retCode = ExcuteTask(channel.get(), nullptr); + retCode = ExcuteTask(channel.get(), done); needRetry = OnReturn(retCode); if (needRetry) { @@ -79,40 +96,6 @@ int TaskExecutor::DoRPCTask() { return retCode; } -int TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) { - task_->rpcTimeoutMs = opt_.rpcTimeoutMS; - - int retCode = -1; - - do { - if (task_->retryTimes++ > opt_.maxRetry) { - LOG(ERROR) << task_->TaskContextStr() - << " retry times exceeds the limit"; - break; - } - - if (!HasValidTarget() && !GetTarget()) { - LOG(WARNING) << "get target fail for " << task_->TaskContextStr() - << ", sleep and retry"; - bthread_usleep(opt_.retryIntervalUS); - continue; - } - - auto channel = channelManager_->GetOrCreateChannel( - task_->target.metaServerID, task_->target.endPoint); - if (!channel) { - LOG(WARNING) << "GetOrCreateChannel fail for " - << task_->TaskContextStr() << ", sleep and retry"; - bthread_usleep(opt_.retryIntervalUS); - continue; - } - retCode = ExcuteTask(channel.get(), done); - break; - } while (true); - - return retCode; -} - bool TaskExecutor::OnReturn(int retCode) { bool needRetry = false; @@ -304,7 +287,7 @@ void TaskExecutorDone::Run() { needRetry = excutor_->OnReturn(code_); if (needRetry) { excutor_->PreProcessBeforeRetry(code_); - code_ = excutor_->DoAsyncRPCTask(this); + code_ = excutor_->DoRPCTaskInner(this); if (code_ < 0) { done_->SetMetaStatusCode(ConvertToMetaStatusCode(code_)); return; diff --git a/curvefs/src/client/rpcclient/task_excutor.h b/curvefs/src/client/rpcclient/task_excutor.h index 0849f95356..872d23a22d 100644 --- a/curvefs/src/client/rpcclient/task_excutor.h +++ b/curvefs/src/client/rpcclient/task_excutor.h @@ -106,7 +106,8 @@ class TaskExecutor { } int DoRPCTask(); - int DoAsyncRPCTask(TaskExecutorDone *done); + void DoAsyncRPCTask(TaskExecutorDone *done); + int DoRPCTaskInner(TaskExecutorDone *done); bool OnReturn(int retCode); void PreProcessBeforeRetry(int retCode); diff --git a/curvefs/src/client/s3/disk_cache_write.cpp b/curvefs/src/client/s3/disk_cache_write.cpp index ea2aa966d0..9237e24374 100644 --- a/curvefs/src/client/s3/disk_cache_write.cpp +++ b/curvefs/src/client/s3/disk_cache_write.cpp @@ -133,8 +133,8 @@ int DiskCacheWrite::UploadFile(const std::string &name, } VLOG(9) << "async upload start, file = " << name; PutObjectAsyncCallBack cb = - [&, buffer, - syncTask](const std::shared_ptr &context) { + [&, buffer, syncTask, name] + (const std::shared_ptr &context) { if (context->retCode == 0) { if (metric_.get() != nullptr) { metric_->writeS3.bps.count << context->bufferSize;