From 76bb43d5078925c2052ed48ccdf5726614151928 Mon Sep 17 00:00:00 2001 From: h0hmj Date: Tue, 30 Aug 2022 14:34:14 +0800 Subject: [PATCH] make callback of s3async request async to avoid deadlock, issue #1854 Signed-off-by: h0hmj --- conf/s3.conf | 2 +- curvefs/conf/client.conf | 2 +- curvefs/conf/mds.conf | 2 +- curvefs/conf/metaserver.conf | 2 +- src/common/concurrent/task_thread_pool.h | 1 + src/common/s3_adapter.cpp | 21 ++++++++++++--------- src/common/s3_adapter.h | 15 +++++++++++++++ 7 files changed, 32 insertions(+), 13 deletions(-) diff --git a/conf/s3.conf b/conf/s3.conf index e44d543254..214cab6369 100644 --- a/conf/s3.conf +++ b/conf/s3.conf @@ -19,7 +19,7 @@ s3.request_timeout=10000 # Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6 s3.loglevel=4 s3.logPrefix=/data/log/curve/aws_ -s3.async_thread_num=64 +s3.async_thread_num=1 # throttle s3.throttle.iopsTotalLimit=5000 s3.throttle.iopsReadLimit=5000 diff --git a/curvefs/conf/client.conf b/curvefs/conf/client.conf index 5e7a2ab471..ff83857c47 100644 --- a/curvefs/conf/client.conf +++ b/curvefs/conf/client.conf @@ -153,7 +153,7 @@ s3.request_timeout=10000 # Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6 s3.loglevel=4 s3.logPrefix=/data/logs/curvefs/aws_ # __CURVEADM_TEMPLATE__ /curvefs/client/logs/aws_ __CURVEADM_TEMPLATE__ -s3.async_thread_num=30 +s3.async_thread_num=1 # limit all inflight async requests' bytes, |0| means not limited s3.max_async_request_inflight_bytes=104857600 s3.chunkFlushThreads=5 diff --git a/curvefs/conf/mds.conf b/curvefs/conf/mds.conf index 245c518313..08ccc98d5d 100644 --- a/curvefs/conf/mds.conf +++ b/curvefs/conf/mds.conf @@ -122,7 +122,7 @@ s3.request_timeout=10000 # Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6 s3.loglevel=4 s3.logPrefix=/data/logs/curvefs/aws_ # __CURVEADM_TEMPLATE__ /curvefs/client/logs/aws_ __CURVEADM_TEMPLATE__ -s3.async_thread_num=30 +s3.async_thread_num=1 # limit all inflight async requests' bytes, |0| means not limited s3.max_async_request_inflight_bytes=104857600 # throttle diff --git a/curvefs/conf/metaserver.conf b/curvefs/conf/metaserver.conf index 9f03254621..6a0e5b1dae 100644 --- a/curvefs/conf/metaserver.conf +++ b/curvefs/conf/metaserver.conf @@ -18,7 +18,7 @@ s3.request_timeout=10000 # Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6 s3.loglevel=4 s3.logPrefix=/tmp/curvefs/metaserver/aws_ -s3.async_thread_num=10 +s3.async_thread_num=1 # throttle s3.throttle.iopsTotalLimit=0 s3.throttle.iopsReadLimit=0 diff --git a/src/common/concurrent/task_thread_pool.h b/src/common/concurrent/task_thread_pool.h index faace7d3c1..176cf5ef99 100644 --- a/src/common/concurrent/task_thread_pool.h +++ b/src/common/concurrent/task_thread_pool.h @@ -73,6 +73,7 @@ class TaskThreadPool : public Uncopyable { } if (!running_.exchange(true, std::memory_order_acq_rel)) { + threads_.clear(); threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { threads_.emplace_back(new std::thread( diff --git a/src/common/s3_adapter.cpp b/src/common/s3_adapter.cpp index 64cff7b418..e0c9a2509a 100644 --- a/src/common/s3_adapter.cpp +++ b/src/common/s3_adapter.cpp @@ -180,9 +180,12 @@ void S3Adapter::Init(const S3AdapterOption& option) { option.maxAsyncRequestInflightBytes == 0 ? UINT64_MAX : option.maxAsyncRequestInflightBytes)); + asyncCallbackTP_.Start(option.asyncThreadNum); } void S3Adapter::Deinit() { + // wait for all previous async requests finished + asyncCallbackTP_.Stop(); // delete s3client in s3adapter if (clientCfg_ != nullptr) { Aws::Delete(clientCfg_); @@ -340,16 +343,16 @@ void S3Adapter::PutObjectAsync(std::shared_ptr context) { AWS_ALLOCATE_TAG, context->buffer, context->bufferSize)); auto originCallback = context->cb; - auto wrapperCallback = + auto throttledCallback = [this, - originCallback](const std::shared_ptr& ctx) { + originCallback](const std::shared_ptr ctx) { inflightBytesThrottle_->OnComplete(ctx->bufferSize); ctx->cb = originCallback; ctx->cb(ctx); }; Aws::S3Crt::PutObjectResponseReceivedHandler handler = - [context]( + [this, context]( const Aws::S3Crt::S3CrtClient * /*client*/, const Aws::S3Crt::Model::PutObjectRequest & /*request*/, const Aws::S3Crt::Model::PutObjectOutcome &response, @@ -367,7 +370,7 @@ void S3Adapter::PutObjectAsync(std::shared_ptr context) { << "resend: " << ctx->key; ctx->retCode = (response.IsSuccess() ? 0 : -1); - ctx->cb(ctx); + asyncCallbackTP_.Enqueue(ctx->cb, ctx); }; if (throttle_) { @@ -375,7 +378,7 @@ void S3Adapter::PutObjectAsync(std::shared_ptr context) { } inflightBytesThrottle_->OnStart(context->bufferSize); - context->cb = std::move(wrapperCallback); + context->cb = std::move(throttledCallback); s3Client_->PutObjectAsync(request, handler, context); } @@ -440,10 +443,10 @@ void S3Adapter::GetObjectAsync(std::shared_ptr context) { }); auto originCallback = context->cb; - auto wrapperCallback = + auto throttledCallback = [this, originCallback]( const S3Adapter* /*adapter*/, - const std::shared_ptr& ctx) { + const std::shared_ptr ctx) { inflightBytesThrottle_->OnComplete(ctx->len); ctx->cb = originCallback; ctx->cb(this, ctx); @@ -466,7 +469,7 @@ void S3Adapter::GetObjectAsync(std::shared_ptr context) { << response.GetError().GetMessage(); ctx->retCode = (response.IsSuccess() ? 0 : -1); - ctx->cb(this, ctx); + asyncCallbackTP_.Enqueue(ctx->cb, this, ctx); }; if (throttle_) { @@ -474,7 +477,7 @@ void S3Adapter::GetObjectAsync(std::shared_ptr context) { } inflightBytesThrottle_->OnStart(context->len); - context->cb = std::move(wrapperCallback); + context->cb = std::move(throttledCallback); s3Client_->GetObjectAsync(request, handler, context); } diff --git a/src/common/s3_adapter.h b/src/common/s3_adapter.h index c30eff37d4..6532591179 100644 --- a/src/common/s3_adapter.h +++ b/src/common/s3_adapter.h @@ -59,6 +59,7 @@ #include "src/common/configuration.h" #include "src/common/throttle.h" +#include "src/common/concurrent/task_thread_pool.h" namespace curve { namespace common { @@ -332,6 +333,19 @@ class S3Adapter { std::condition_variable cond_; }; + class AsyncCallbackThreadPool : public TaskThreadPool<> { + protected: + void ThreadFunc() override { + while (running_.load(std::memory_order_acquire) || + !queue_.empty()) { + Task task(Take()); + if (task) { + task(); + } + } + } + }; + private: // S3服务器地址 Aws::String s3Address_; @@ -348,6 +362,7 @@ class S3Adapter { Throttle *throttle_; std::unique_ptr inflightBytesThrottle_; + AsyncCallbackThreadPool asyncCallbackTP_; }; class FakeS3Adapter : public S3Adapter {