Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize curve-client #88

Merged
merged 1 commit into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ schedule.queueCapacity=1000000
# 队列取出到发送完rpc请求大概在(20us-100us),20us是正常情况下不需要获取leader的时候
# 如果在发送的时候需要获取leader,时间会在100us左右,一个线程的吞吐在10w-50w
# 性能已经满足需求
schedule.threadpoolSize=1
schedule.threadpoolSize=2

# 为隔离qemu侧线程引入的任务队列,因为qemu一侧只有一个IO线程
# 当qemu一侧调用aio接口的时候直接将调用push到任务队列就返回,
Expand Down Expand Up @@ -112,7 +112,7 @@ chunkserver.maxRetryTimesBeforeConsiderSuspend=20
################# 文件级别配置项 #############
#
# libcurve底层rpc调度允许最大的未返回rpc数量,每个文件的inflight RPC独立
global.fileMaxInFlightRPCNum=64
global.fileMaxInFlightRPCNum=128

# 文件IO下发到底层chunkserver最大的分片KB
global.fileIOSplitMaxSizeKB=64
Expand Down
5 changes: 3 additions & 2 deletions curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ client_metacache_get_leader_timeout_ms: 500
client_metacache_get_leader_retry: 5
client_metacache_rpc_retry_interval_us: 100000
client_schedule_queue_capacity: 1000000
client_schedule_threadpool_size: 1
client_schedule_threadpool_size: 2
client_isolation_task_queue_capacity: 1000000
client_isolation_task_thread_pool_size: 1
client_chunkserver_op_retry_interval_us: 100000
Expand All @@ -183,7 +183,7 @@ client_chunkserver_check_health_timeout_ms: 100
client_chunkserver_server_stable_threshold: 3
client_chunkserver_min_retry_times_force_timeout_backoff: 5
client_chunkserver_max_retry_times_before_consider_suspend: 20
client_file_max_inflight_rpc_num: 64
client_file_max_inflight_rpc_num: 128
client_file_io_split_max_size_kb: 64
client_log_level: 0
client_log_path: /data/log/curve/
Expand All @@ -198,6 +198,7 @@ nebd_client_rpc_retry_max_inverval_us: 64000000
nebd_client_rpc_hostdown_retry_inverval_us: 10000
nebd_client_health_check_internal_s: 1
nebd_client_delay_health_check_internal_ms: 100
nebd_client_rpc_send_exec_queue_num: 2
nebd_client_heartbeat_inverval_s: 5
nebd_client_heartbeat_rpc_timeout_ms: 500
nebd_server_heartbeat_timeout_s: 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ request.rpcHostDownRetryIntervalUs={{ nebd_client_rpc_hostdown_retry_inverval_us
request.rpcHealthCheckIntervalS={{ nebd_client_health_check_internal_s }}
# brpc从rpc失败到进行健康检查的最大时间间隔,单位ms
request.rpcMaxDelayHealthCheckIntervalMs={{ nebd_client_delay_health_check_internal_ms }}
# rpc发送执行队列个数
request.rpcSendExecQueueNum={{ nebd_client_rpc_send_exec_queue_num }}

# heartbeat间隔
heartbeat.intervalS={{ nebd_client_heartbeat_inverval_s }}
Expand Down
9 changes: 7 additions & 2 deletions curvesnapshot_python/libcurveSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
#include "src/client/client_config.h"
#include "include/client/libcurve.h"
#include "src/client/client_common.h"
#include "src/common/concurrent/concurrent.h"

using curve::client::UserInfo;
using curve::client::ClientConfig;
using curve::client::SnapshotClient;
using curve::client::SnapCloneClosure;
using curve::client::FileServiceOption;
using curve::client::ClientConfigOption;
using curve::common::Mutex;
using curve::common::ConditionVariable;

class TaskTracker {
public:
Expand Down Expand Up @@ -128,8 +133,8 @@ int Init(const char* path) {
return -LIBCURVE_ERROR::FAILED;
}

FileServiceOption_t fileopt = cc.GetFileServiceOption();
ClientConfigOption_t copt;
FileServiceOption fileopt = cc.GetFileServiceOption();
ClientConfigOption copt;
copt.loginfo = fileopt.loginfo;
copt.ioOpt = fileopt.ioOpt;
copt.metaServerOpt = fileopt.metaServerOpt;
Expand Down
2 changes: 2 additions & 0 deletions nebd/etc/nebd/nebd-client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ request.rpcHostDownRetryIntervalUs=10000
request.rpcHealthCheckIntervalS=1
# brpc从rpc失败到进行健康检查的最大时间间隔,单位ms
request.rpcMaxDelayHealthCheckIntervalMs=100
# rpc发送执行队列个数
request.rpcSendExecQueueNum=2

# heartbeat间隔
heartbeat.intervalS=5
Expand Down
1 change: 1 addition & 0 deletions nebd/src/part1/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cc_library(
"//external:brpc",
"//nebd/src/common:nebd_common",
"//nebd/proto:client_cc_proto",
"//include:include-common"
],
copts = COPTS,
linkopts = ["-Wl,-rpath=/usr/lib/nebd"],
Expand Down
143 changes: 100 additions & 43 deletions nebd/src/part1/nebd_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,31 @@ int NebdClient::Init(const char* confpath) {

heartbeatMgr_->Run();

// init rpc send exec-queue
rpcTaskQueues_.resize(option_.requestOption.rpcSendExecQueueNum);
for (auto& q : rpcTaskQueues_) {
int rc = bthread::execution_queue_start(
&q, nullptr, &NebdClient::ExecAsyncRpcTask, this);
if (rc != 0) {
LOG(ERROR) << "Init AsyncRpcQueues failed";
return -1;
}
}

return 0;
}

void NebdClient::Uninit() {
if (heartbeatMgr_ != nullptr) {
heartbeatMgr_->Stop();
}

// stop exec queue
for (auto& q : rpcTaskQueues_) {
bthread::execution_queue_stop(q);
bthread::execution_queue_join(q);
}

LOG(INFO) << "NebdClient uninit success.";
google::ShutdownGoogleLogging();
}
Expand Down Expand Up @@ -289,67 +307,85 @@ int64_t NebdClient::GetFileSize(int fd) {
}

int NebdClient::Discard(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::DiscardRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioDiscardClosure* done = new(std::nothrow) AioDiscardClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Discard(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::DiscardRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioDiscardClosure* done = new(std::nothrow) AioDiscardClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Discard(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

int NebdClient::AioRead(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::ReadRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioReadClosure* done = new(std::nothrow) AioReadClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Read(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::ReadRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioReadClosure* done = new(std::nothrow) AioReadClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Read(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

static void EmptyDeleter(void* m) {}

int NebdClient::AioWrite(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::WriteRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::WriteRequest request;
request.set_fd(fd);
request.set_offset(aioctx->offset);
request.set_size(aioctx->length);

AioWriteClosure* done = new(std::nothrow) AioWriteClosure(
fd, aioctx, option_.requestOption);

AioWriteClosure* done = new(std::nothrow) AioWriteClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
done->cntl.request_attachment().append_user_data(
aioctx->buf, aioctx->length, EmptyDeleter);
stub.Write(&done->cntl, &request, &done->response, done);
};

done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
done->cntl.request_attachment().append_user_data(
aioctx->buf, aioctx->length, EmptyDeleter);
stub.Write(&done->cntl, &request, &done->response, done);
PushAsyncTask(task);

return 0;
}

int NebdClient::Flush(int fd, NebdClientAioContext* aioctx) {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::FlushRequest request;
request.set_fd(fd);

AioFlushClosure* done = new(std::nothrow) AioFlushClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Flush(&done->cntl, &request, &done->response, done);
auto task = [this, fd, aioctx]() {
nebd::client::NebdFileService_Stub stub(&channel_);
nebd::client::FlushRequest request;
request.set_fd(fd);

AioFlushClosure* done = new(std::nothrow) AioFlushClosure(
fd, aioctx, option_.requestOption);
done->cntl.set_timeout_ms(-1);
done->cntl.set_log_id(logId_.fetch_add(1, std::memory_order_relaxed));
stub.Flush(&done->cntl, &request, &done->response, done);
};

PushAsyncTask(task);

return 0;
}

Expand Down Expand Up @@ -473,6 +509,13 @@ int NebdClient::InitNebdClientOption(Configuration* conf) {
LOG_IF(ERROR, ret != true) << "Load request.rpcMaxDelayHealthCheckIntervalMs failed"; // NOLINT
RETURN_IF_FALSE(ret);

ret = conf->GetUInt32Value("request.rpcSendExecQueueNum",
&requestOption.rpcSendExecQueueNum);
LOG_IF(ERROR, ret != true)
<< "Load request.rpcSendExecQueueNum from config file failed, current "
"value is "
<< requestOption.rpcSendExecQueueNum;

option_.requestOption = requestOption;

ret = conf->GetStringValue("log.path", &option_.logOption.logPath);
Expand Down Expand Up @@ -564,5 +607,19 @@ void NebdClient::InitLogger(const LogOption& logOption) {
google::InitGoogleLogging(kProcessName);
}

int NebdClient::ExecAsyncRpcTask(void* meta,
bthread::TaskIterator<AsyncRpcTask>& iter) { // NOLINT
if (iter.is_queue_stopped()) {
return 0;
}

for (; iter; ++iter) {
auto& task = *iter;
task();
}

return 0;
}

} // namespace client
} // namespace nebd
22 changes: 22 additions & 0 deletions nebd/src/part1/nebd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#define NEBD_SRC_PART1_NEBD_CLIENT_H_

#include <brpc/channel.h>
#include <bthread/execution_queue.h>

#include <functional>
#include <string>
#include <memory>
#include <vector>

#include "nebd/src/part1/nebd_common.h"
#include "nebd/src/common/configuration.h"
Expand All @@ -36,6 +38,8 @@
#include "nebd/src/part1/heartbeat_manager.h"
#include "nebd/src/part1/nebd_metacache.h"

#include "include/curve_compiler_specific.h"

namespace nebd {
namespace client {

Expand Down Expand Up @@ -171,6 +175,24 @@ class NebdClient {
brpc::Channel channel_;

std::atomic<uint64_t> logId_{1};

private:
using AsyncRpcTask = std::function<void()>;

std::vector<bthread::ExecutionQueueId<AsyncRpcTask>> rpcTaskQueues_;
xu-chaojie marked this conversation as resolved.
Show resolved Hide resolved

static int ExecAsyncRpcTask(void* meta, bthread::TaskIterator<AsyncRpcTask>& iter); // NOLINT

void PushAsyncTask(const AsyncRpcTask& task) {
xu-chaojie marked this conversation as resolved.
Show resolved Hide resolved
static thread_local unsigned int seed = time(nullptr);

int idx = rand_r(&seed) % rpcTaskQueues_.size();
int rc = bthread::execution_queue_execute(rpcTaskQueues_[idx], task);

if (CURVE_UNLIKELY(rc != 0)) {
task();
}
}
};

extern NebdClient &nebdClient;
Expand Down
2 changes: 2 additions & 0 deletions nebd/src/part1/nebd_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct RequestOption {
int64_t rpcHealthCheckIntervalS;
// brpc从rpc失败到进行健康检查的最大时间间隔
int64_t rpcMaxDelayHealthCheckIntervalMs;
// rpc发送执行队列个数
uint32_t rpcSendExecQueueNum = 2;
};

// 日志配置项
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/clone_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int CloneManager::Run() {
return 0;
// 启动线程池
LOG(INFO) << "Begin to run clone manager.";
tp_ = std::make_shared<TaskThreadPool>();
tp_ = std::make_shared<TaskThreadPool<>>();
int ret = tp_->Start(options_.threadNum, options_.queueCapacity);
if (ret < 0) {
LOG(ERROR) << "clone manager start error."
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/clone_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class CloneManager {
// 克隆任务管理相关的选项,调Init的时候初始化
CloneOptions options_;
// 处理克隆任务的异步线程池
std::shared_ptr<TaskThreadPool> tp_;
std::shared_ptr<TaskThreadPool<>> tp_;
// 当前线程池是否处于工作状态
std::atomic<bool> isRunning_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::once_flag addServiceFlag;
int CopysetNodeManager::Init(const CopysetNodeOptions &copysetNodeOptions) {
copysetNodeOptions_ = copysetNodeOptions;
if (copysetNodeOptions_.loadConcurrency > 0) {
copysetLoader_ = std::make_shared<TaskThreadPool>();
copysetLoader_ = std::make_shared<TaskThreadPool<>>();
} else {
copysetLoader_ = nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class CopysetNodeManager : public curve::common::Uncopyable {
// 复制组配置选项
CopysetNodeOptions copysetNodeOptions_;
// 控制copyset并发启动的数量
std::shared_ptr<TaskThreadPool> copysetLoader_;
std::shared_ptr<TaskThreadPool<>> copysetLoader_;
// 表示copyset node manager当前是否正在运行
Atomic<bool> running_;
// 表示copyset node manager当前是否已经完成加载
Expand Down
Loading