Skip to content

Commit

Permalink
optimize curve-client
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hanqing committed Sep 15, 2020
1 parent 764001e commit 0a4f69d
Show file tree
Hide file tree
Showing 87 changed files with 2,388 additions and 2,351 deletions.
4 changes: 2 additions & 2 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ schedule.queueCapacity=1000000
# 队列取出到发送完rpc请求大概在(20us-100us),20us是正常情况下不需要获取leader的时候
# 如果在发送的时候需要获取leader,时间会在100us左右,一个线程的吞吐在10w-50w
# 性能已经满足需求
schedule.threadpoolSize=1
schedule.threadpoolSize=2

# 为隔离qemu侧线程引入的任务队列,因为qemu一侧只有一个IO线程
# 当qemu一侧调用aio接口的时候直接将调用push到任务队列就返回,
Expand Down Expand Up @@ -112,7 +112,7 @@ chunkserver.maxRetryTimesBeforeConsiderSuspend=20
################# 文件级别配置项 #############
#
# libcurve底层rpc调度允许最大的未返回rpc数量,每个文件的inflight RPC独立
global.fileMaxInFlightRPCNum=64
global.fileMaxInFlightRPCNum=128

# 文件IO下发到底层chunkserver最大的分片KB
global.fileIOSplitMaxSizeKB=64
Expand Down
9 changes: 7 additions & 2 deletions curvesnapshot_python/libcurveSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
#include "src/client/client_config.h"
#include "include/client/libcurve.h"
#include "src/client/client_common.h"
#include "src/common/concurrent/concurrent.h"

using curve::client::UserInfo;
using curve::client::ClientConfig;
using curve::client::SnapshotClient;
using curve::client::SnapCloneClosure;
using curve::client::FileServiceOption;
using curve::client::ClientConfigOption;
using curve::common::Mutex;
using curve::common::ConditionVariable;

class TaskTracker {
public:
Expand Down Expand Up @@ -128,8 +133,8 @@ int Init(const char* path) {
return -LIBCURVE_ERROR::FAILED;
}

FileServiceOption_t fileopt = cc.GetFileServiceOption();
ClientConfigOption_t copt;
FileServiceOption fileopt = cc.GetFileServiceOption();
ClientConfigOption copt;
copt.loginfo = fileopt.loginfo;
copt.ioOpt = fileopt.ioOpt;
copt.metaServerOpt = fileopt.metaServerOpt;
Expand Down
2 changes: 2 additions & 0 deletions nebd/etc/nebd/nebd-client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ request.rpcHostDownRetryIntervalUs=10000
request.rpcHealthCheckIntervalS=1
# brpc从rpc失败到进行健康检查的最大时间间隔,单位ms
request.rpcMaxDelayHealthCheckIntervalMs=100
# rpc发送执行队列个数
request.rpcSendExecQueueNum=2

# heartbeat间隔
heartbeat.intervalS=5
Expand Down
1 change: 1 addition & 0 deletions nebd/src/part1/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cc_library(
"//external:brpc",
"//nebd/src/common:nebd_common",
"//nebd/proto:client_cc_proto",
"//include:include-common"
],
copts = COPTS,
linkopts = ["-Wl,-rpath=/usr/lib/nebd"],
Expand Down
143 changes: 100 additions & 43 deletions nebd/src/part1/nebd_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,31 @@ int NebdClient::Init(const char* confpath) {

heartbeatMgr_->Run();

// init rpc send exec-queue
rpcTaskQueues_.resize(option_.requestOption.rpcSendExecQueueNum);
for (auto& q : rpcTaskQueues_) {
int rc = bthread::execution_queue_start(
&q, nullptr, &NebdClient::ExecAsyncRpcTask, this);
if (rc != 0) {
LOG(ERROR) << "Init AsyncRpcQueues failed";
return -1;
}
}

return 0;
}

void NebdClient::Uninit() {
if (heartbeatMgr_ != nullptr) {
heartbeatMgr_->Stop();
}

// stop exec queue
for (auto& q : rpcTaskQueues_) {
bthread::execution_queue_stop(q);
bthread::execution_queue_join(q);
}

LOG(INFO) << "NebdClient uninit success.";
google::ShutdownGoogleLogging();
}
Expand Down Expand Up @@ -289,67 +307,85 @@ int64_t NebdClient::GetFileSize(int fd) {
}

int NebdClient::Discard(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::DiscardRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioDiscardClosure* done = new(std::nothrow) AioDiscardClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Discard(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::DiscardRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioDiscardClosure* done = new(std::nothrow) AioDiscardClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Discard(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

int NebdClient::AioRead(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::ReadRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioReadClosure* done = new(std::nothrow) AioReadClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Read(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::ReadRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioReadClosure* done = new(std::nothrow) AioReadClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Read(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

static void EmptyDeleter(void* m) {}

int NebdClient::AioWrite(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::WriteRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::WriteRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioWriteClosure* done = new(std::nothrow) AioWriteClosure(
fd, aioctx, option_.requestOption);

AioWriteClosure* done = new(std::nothrow) AioWriteClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
done->cntl.request_attachment().append_user_data(
aioctx->buf, aioctx->length, EmptyDeleter);
stub.Write(&done->cntl, &request, &done->response, done);
};

done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
done->cntl.request_attachment().append_user_data(
aioctx->buf, aioctx->length, EmptyDeleter);
stub.Write(&done->cntl, &request, &done->response, done);
PushAsyncTask(task);

return 0;
}

int NebdClient::Flush(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::FlushRequest request;
request.set_fd(fd);

AioFlushClosure* done = new(std::nothrow) AioFlushClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Flush(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::FlushRequest request;
request.set_fd(fd);

AioFlushClosure* done = new(std::nothrow) AioFlushClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Flush(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

Expand Down Expand Up @@ -473,6 +509,13 @@ int NebdClient::InitNebdClientOption(Configuration* conf) {
LOG_IF(ERROR, ret != true) << "Load request.rpcMaxDelayHealthCheckIntervalMs failed"; // NOLINT
RETURN_IF_FALSE(ret);

ret = conf->GetUInt32Value("request.rpcSendExecQueueNum",
&requestOption.rpcSendExecQueueNum);
LOG_IF(WARNING, ret != true)
<< "Load request.rpcSendExecQueueNum from config file failed, current "
"value is "
<< requestOption.rpcSendExecQueueNum;

option_.requestOption = requestOption;

ret = conf->GetStringValue("log.path", &option_.logOption.logPath);
Expand Down Expand Up @@ -564,5 +607,19 @@ void NebdClient::InitLogger(const LogOption& logOption) {
google::InitGoogleLogging(kProcessName);
}

int NebdClient::ExecAsyncRpcTask(void* meta,
bthread::TaskIterator<AsyncRpcTask>& iter) { // NOLINT
if (iter.is_queue_stopped()) {
return 0;
}

for (; iter; ++iter) {
auto& task = *iter;
task();
}

return 0;
}

} // namespace client
} // namespace nebd
25 changes: 25 additions & 0 deletions nebd/src/part1/nebd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
#define NEBD_SRC_PART1_NEBD_CLIENT_H_

#include <brpc/channel.h>
#include <bthread/execution_queue.h>

#include <functional>
#include <string>
#include <memory>
#include <vector>
#include <random>

#include "nebd/src/part1/nebd_common.h"
#include "nebd/src/common/configuration.h"
Expand All @@ -36,6 +39,8 @@
#include "nebd/src/part1/heartbeat_manager.h"
#include "nebd/src/part1/nebd_metacache.h"

#include "include/curve_compiler_specific.h"

namespace nebd {
namespace client {

Expand Down Expand Up @@ -171,6 +176,26 @@ class NebdClient {
brpc::Channel channel_;

std::atomic<uint64_t> logId_{1};

private:
using AsyncRpcTask = std::function<void()>;

std::vector<bthread::ExecutionQueueId<AsyncRpcTask>> rpcTaskQueues_;

static int ExecAsyncRpcTask(void* meta, bthread::TaskIterator<AsyncRpcTask>& iter); // NOLINT

void PushAsyncTask(const AsyncRpcTask& task) {
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution<int> dist(
0, rpcTaskQueues_.size() - 1);

int rc =
bthread::execution_queue_execute(rpcTaskQueues_[dist(gen)], task);
if (CURVE_UNLIKELY(rc != 0)) {
task();
}
}
};

extern NebdClient &nebdClient;
Expand Down
2 changes: 2 additions & 0 deletions nebd/src/part1/nebd_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct RequestOption {
int64_t rpcHealthCheckIntervalS;
// brpc从rpc失败到进行健康检查的最大时间间隔
int64_t rpcMaxDelayHealthCheckIntervalMs;
// rpc发送执行队列个数
uint32_t rpcSendExecQueueNum = 2;
};

// 日志配置项
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/clone_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int CloneManager::Run() {
return 0;
// 启动线程池
LOG(INFO) << "Begin to run clone manager.";
tp_ = std::make_shared<TaskThreadPool>();
tp_ = std::make_shared<TaskThreadPool<>>();
int ret = tp_->Start(options_.threadNum, options_.queueCapacity);
if (ret < 0) {
LOG(ERROR) << "clone manager start error."
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/clone_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class CloneManager {
// 克隆任务管理相关的选项,调Init的时候初始化
CloneOptions options_;
// 处理克隆任务的异步线程池
std::shared_ptr<TaskThreadPool> tp_;
std::shared_ptr<TaskThreadPool<>> tp_;
// 当前线程池是否处于工作状态
std::atomic<bool> isRunning_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::once_flag addServiceFlag;
int CopysetNodeManager::Init(const CopysetNodeOptions &copysetNodeOptions) {
copysetNodeOptions_ = copysetNodeOptions;
if (copysetNodeOptions_.loadConcurrency > 0) {
copysetLoader_ = std::make_shared<TaskThreadPool>();
copysetLoader_ = std::make_shared<TaskThreadPool<>>();
} else {
copysetLoader_ = nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class CopysetNodeManager : public curve::common::Uncopyable {
// 复制组配置选项
CopysetNodeOptions copysetNodeOptions_;
// 控制copyset并发启动的数量
std::shared_ptr<TaskThreadPool> copysetLoader_;
std::shared_ptr<TaskThreadPool<>> copysetLoader_;
// 表示copyset node manager当前是否正在运行
Atomic<bool> running_;
// 表示copyset node manager当前是否已经完成加载
Expand Down
Loading

0 comments on commit 0a4f69d

Please sign in to comment.