Skip to content

Commit

Permalink
make callback of s3async request async to avoid deadlock, issue #1854
Browse files Browse the repository at this point in the history
Signed-off-by: h0hmj <[email protected]>
  • Loading branch information
h0hmj committed Sep 7, 2022
1 parent 5c178fa commit 76bb43d
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 13 deletions.
2 changes: 1 addition & 1 deletion conf/s3.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ s3.request_timeout=10000
# Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6
s3.loglevel=4
s3.logPrefix=/data/log/curve/aws_
s3.async_thread_num=64
s3.async_thread_num=1
# throttle
s3.throttle.iopsTotalLimit=5000
s3.throttle.iopsReadLimit=5000
Expand Down
2 changes: 1 addition & 1 deletion curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ s3.request_timeout=10000
# Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6
s3.loglevel=4
s3.logPrefix=/data/logs/curvefs/aws_ # __CURVEADM_TEMPLATE__ /curvefs/client/logs/aws_ __CURVEADM_TEMPLATE__
s3.async_thread_num=30
s3.async_thread_num=1
# limit all inflight async requests' bytes, |0| means not limited
s3.max_async_request_inflight_bytes=104857600
s3.chunkFlushThreads=5
Expand Down
2 changes: 1 addition & 1 deletion curvefs/conf/mds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ s3.request_timeout=10000
# Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6
s3.loglevel=4
s3.logPrefix=/data/logs/curvefs/aws_ # __CURVEADM_TEMPLATE__ /curvefs/client/logs/aws_ __CURVEADM_TEMPLATE__
s3.async_thread_num=30
s3.async_thread_num=1
# limit all inflight async requests' bytes, |0| means not limited
s3.max_async_request_inflight_bytes=104857600
# throttle
Expand Down
2 changes: 1 addition & 1 deletion curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ s3.request_timeout=10000
# Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6
s3.loglevel=4
s3.logPrefix=/tmp/curvefs/metaserver/aws_
s3.async_thread_num=10
s3.async_thread_num=1
# throttle
s3.throttle.iopsTotalLimit=0
s3.throttle.iopsReadLimit=0
Expand Down
1 change: 1 addition & 0 deletions src/common/concurrent/task_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class TaskThreadPool : public Uncopyable {
}

if (!running_.exchange(true, std::memory_order_acq_rel)) {
threads_.clear();
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i) {
threads_.emplace_back(new std::thread(
Expand Down
21 changes: 12 additions & 9 deletions src/common/s3_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ void S3Adapter::Init(const S3AdapterOption& option) {
option.maxAsyncRequestInflightBytes == 0
? UINT64_MAX
: option.maxAsyncRequestInflightBytes));
asyncCallbackTP_.Start(option.asyncThreadNum);
}

void S3Adapter::Deinit() {
// wait for all previous async requests finished
asyncCallbackTP_.Stop();
// delete s3client in s3adapter
if (clientCfg_ != nullptr) {
Aws::Delete<Aws::S3Crt::ClientConfiguration>(clientCfg_);
Expand Down Expand Up @@ -340,16 +343,16 @@ void S3Adapter::PutObjectAsync(std::shared_ptr<PutObjectAsyncContext> context) {
AWS_ALLOCATE_TAG, context->buffer, context->bufferSize));

auto originCallback = context->cb;
auto wrapperCallback =
auto throttledCallback =
[this,
originCallback](const std::shared_ptr<PutObjectAsyncContext>& ctx) {
originCallback](const std::shared_ptr<PutObjectAsyncContext> ctx) {
inflightBytesThrottle_->OnComplete(ctx->bufferSize);
ctx->cb = originCallback;
ctx->cb(ctx);
};

Aws::S3Crt::PutObjectResponseReceivedHandler handler =
[context](
[this, context](
const Aws::S3Crt::S3CrtClient * /*client*/,
const Aws::S3Crt::Model::PutObjectRequest & /*request*/,
const Aws::S3Crt::Model::PutObjectOutcome &response,
Expand All @@ -367,15 +370,15 @@ void S3Adapter::PutObjectAsync(std::shared_ptr<PutObjectAsyncContext> context) {
<< "resend: " << ctx->key;

ctx->retCode = (response.IsSuccess() ? 0 : -1);
ctx->cb(ctx);
asyncCallbackTP_.Enqueue(ctx->cb, ctx);
};

if (throttle_) {
throttle_->Add(false, context->bufferSize);
}

inflightBytesThrottle_->OnStart(context->bufferSize);
context->cb = std::move(wrapperCallback);
context->cb = std::move(throttledCallback);
s3Client_->PutObjectAsync(request, handler, context);
}

Expand Down Expand Up @@ -440,10 +443,10 @@ void S3Adapter::GetObjectAsync(std::shared_ptr<GetObjectAsyncContext> context) {
});

auto originCallback = context->cb;
auto wrapperCallback =
auto throttledCallback =
[this, originCallback](
const S3Adapter* /*adapter*/,
const std::shared_ptr<GetObjectAsyncContext>& ctx) {
const std::shared_ptr<GetObjectAsyncContext> ctx) {
inflightBytesThrottle_->OnComplete(ctx->len);
ctx->cb = originCallback;
ctx->cb(this, ctx);
Expand All @@ -466,15 +469,15 @@ void S3Adapter::GetObjectAsync(std::shared_ptr<GetObjectAsyncContext> context) {
<< response.GetError().GetMessage();

ctx->retCode = (response.IsSuccess() ? 0 : -1);
ctx->cb(this, ctx);
asyncCallbackTP_.Enqueue(ctx->cb, this, ctx);
};

if (throttle_) {
throttle_->Add(true, context->len);
}

inflightBytesThrottle_->OnStart(context->len);
context->cb = std::move(wrapperCallback);
context->cb = std::move(throttledCallback);
s3Client_->GetObjectAsync(request, handler, context);
}

Expand Down
15 changes: 15 additions & 0 deletions src/common/s3_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

#include "src/common/configuration.h"
#include "src/common/throttle.h"
#include "src/common/concurrent/task_thread_pool.h"

namespace curve {
namespace common {
Expand Down Expand Up @@ -332,6 +333,19 @@ class S3Adapter {
std::condition_variable cond_;
};

class AsyncCallbackThreadPool : public TaskThreadPool<> {
protected:
void ThreadFunc() override {
while (running_.load(std::memory_order_acquire) ||
!queue_.empty()) {
Task task(Take());
if (task) {
task();
}
}
}
};

private:
// S3服务器地址
Aws::String s3Address_;
Expand All @@ -348,6 +362,7 @@ class S3Adapter {
Throttle *throttle_;

std::unique_ptr<AsyncRequestInflightBytesThrottle> inflightBytesThrottle_;
AsyncCallbackThreadPool asyncCallbackTP_;
};

class FakeS3Adapter : public S3Adapter {
Expand Down

0 comments on commit 76bb43d

Please sign in to comment.