Skip to content

Commit

Permalink
【chunkserver】split read thread and write thread in concurrentApplyModel
Browse files Browse the repository at this point in the history
  • Loading branch information
lixiaocuicoding committed Sep 21, 2020
1 parent 93d206a commit c425458
Show file tree
Hide file tree
Showing 27 changed files with 618 additions and 632 deletions.
13 changes: 8 additions & 5 deletions conf/chunkserver.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ storeng.sync_write=false

#
# Concurrent apply module
#
# 并发模块的并发度,一般是10
concurrentapply.size=10
# 并发模块线程的队列深度
concurrentapply.queuedepth=1
# 并发模块写线程的并发度,一般是10
wconcurrentapply.size=10
# 并发模块写线程的队列深度
wconcurrentapply.queuedepth=1
# 并发模块读线程的并发度,一般是5
rconcurrentapply.size=5
# 并发模块读线程的队列深度
rconcurrentapply.queuedepth=1

#
# Chunkfile pool
Expand Down
2 changes: 0 additions & 2 deletions coverage/test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/env python

def lls(s):
d = {}
start = 0
Expand Down
6 changes: 4 additions & 2 deletions curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ chunkserver_s3_config_path: /etc/curve/cs_s3.conf
chunkserver_fs_enable_renameat2: true
chunkserver_metric_onoff: true
chunkserver_storeng_sync_write: false
chunkserver_concurrentapply_size: 10
chunkserver_concurrentapply_queuedepth: 1
chunkserver_wconcurrentapply_size: 10
chunkserver_wconcurrentapply_queuedepth: 1
chunkserver_rconcurrentapply_size: 5
chunkserver_rconcurrentapply_queuedepth: 1
chunkserver_chunkfilepool_enable_get_chunk_from_pool: true
chunkserver_chunkfilepool_chunk_file_pool_dir: ./0/
chunkserver_chunkfilepool_cpmeta_file_size: 4096
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ storeng.sync_write={{ chunkserver_storeng_sync_write }}
# Concurrent apply module
#
# 并发模块的并发度,一般是10
concurrentapply.size={{ chunkserver_concurrentapply_size }}
wconcurrentapply.size={{ chunkserver_wconcurrentapply_size }}
# 并发模块线程的队列深度
concurrentapply.queuedepth={{ chunkserver_concurrentapply_queuedepth }}
wconcurrentapply.queuedepth={{ chunkserver_wconcurrentapply_queuedepth }}
# 并发模块读线程的并发度,一般是5
rconcurrentapply.size={{ chunkserver_rconcurrentapply_size }}
# 并发模块读线程的队列深度
rconcurrentapply.queuedepth={{ chunkserver_rconcurrentapply_queuedepth }}

#
# Chunkfile pool
Expand Down
5 changes: 5 additions & 0 deletions deploy/local/chunkserver/conf/chunkserver.conf.0
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ storeng.sync_write=false
#
concurrentapply.size=10
concurrentapply.queuedepth=1
wconcurrentapply.size=10
wconcurrentapply.queuedepth=1
rconcurrentapply.size=5
rconcurrentapply.queuedepth=1


#
# Chunkfile pool
Expand Down
6 changes: 4 additions & 2 deletions deploy/local/chunkserver/conf/chunkserver.conf.1
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ storeng.sync_write=false
#
# Concurrent apply module
#
concurrentapply.size=10
concurrentapply.queuedepth=1
wconcurrentapply.size=10
wconcurrentapply.queuedepth=1
rconcurrentapply.size=5
rconcurrentapply.queuedepth=1

#
# Chunkfile pool
Expand Down
6 changes: 4 additions & 2 deletions deploy/local/chunkserver/conf/chunkserver.conf.2
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ storeng.sync_write=false
#
# Concurrent apply module
#
concurrentapply.size=10
concurrentapply.queuedepth=1
wconcurrentapply.size=10
wconcurrentapply.queuedepth=1
rconcurrentapply.size=5
rconcurrentapply.queuedepth=1

#
# Chunkfile pool
Expand Down
3 changes: 3 additions & 0 deletions src/chunkserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cc_library(
"//proto:chunkserver-cc-protos",
"//proto:topology_cc_proto",
"//src/chunkserver/datastore:chunkserver_datastore",
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down Expand Up @@ -108,6 +109,7 @@ cc_library(
"//proto:chunkserver-cc-protos",
"//proto:topology_cc_proto",
"//src/chunkserver/datastore:chunkserver_datastore",
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down Expand Up @@ -144,6 +146,7 @@ cc_binary(
"//proto:chunkserver-cc-protos",
"//src/chunkserver:chunkserver-lib",
"//src/chunkserver/datastore:chunkserver_datastore",
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
Expand Down
23 changes: 18 additions & 5 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using ::curve::fs::LocalFileSystem;
using ::curve::fs::LocalFileSystemOption;
using ::curve::fs::LocalFsFactory;
using ::curve::fs::FileSystemType;
using ::curve::chunkserver::concurrent::ConcurrentApplyModule;

DEFINE_string(conf, "ChunkServer.conf", "Path of configuration file");
DEFINE_string(chunkServerIp, "127.0.0.1", "chunkserver ip");
Expand Down Expand Up @@ -104,11 +105,9 @@ int ChunkServer::Run(int argc, char** argv) {

// 初始化并发持久模块
ConcurrentApplyModule concurrentapply;
int size;
LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.size", &size));
int qdepth;
LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.queuedepth", &qdepth));
LOG_IF(FATAL, false == concurrentapply.Init(size, qdepth))
ConcurrentApplyOption concurrentApplyOptions;
InitConcurrentApplyOptions(&conf, &concurrentApplyOptions);
LOG_IF(FATAL, false == concurrentapply.Init(concurrentApplyOptions))
<< "Failed to initialize concurrentapply module!";

// 初始化本地文件系统
Expand Down Expand Up @@ -380,6 +379,8 @@ void ChunkServer::Stop() {
brpc::AskToQuit();
}



void ChunkServer::InitChunkFilePoolOptions(
common::Configuration *conf, ChunkfilePoolOptions *chunkFilePoolOptions) {
LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size",
Expand Down Expand Up @@ -408,6 +409,18 @@ void ChunkServer::InitChunkFilePoolOptions(
}
}

void ChunkServer::InitConcurrentApplyOptions(common::Configuration *conf,
ConcurrentApplyOption *concurrentApplyOptions) {
LOG_IF(FATAL, !conf->GetIntValue(
"rconcurrentapply.size", &concurrentApplyOptions->rconcurrentsize));
LOG_IF(FATAL, !conf->GetIntValue(
"wconcurrentapply.size", &concurrentApplyOptions->wconcurrentsize));
LOG_IF(FATAL, !conf->GetIntValue(
"rconcurrentapply.queuedepth", &concurrentApplyOptions->rqueuedepth));
LOG_IF(FATAL, !conf->GetIntValue(
"wconcurrentapply.queuedepth", &concurrentApplyOptions->wqueuedepth));
}

void ChunkServer::InitCopysetNodeOptions(
common::Configuration *conf, CopysetNodeOptions *copysetNodeOptions) {
LOG_IF(FATAL, !conf->GetStringValue("global.ip", &copysetNodeOptions->ip));
Expand Down
6 changes: 6 additions & 0 deletions src/chunkserver/chunkserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include "src/chunkserver/register.h"
#include "src/chunkserver/trash.h"
#include "src/chunkserver/chunkserver_metrics.h"
#include "src/chunkserver/concurrent_apply/concurrent_apply.h"

using ::curve::chunkserver::concurrent::ConcurrentApplyOption;

namespace curve {
namespace chunkserver {
Expand All @@ -56,6 +59,9 @@ class ChunkServer {
void InitChunkFilePoolOptions(common::Configuration *conf,
ChunkfilePoolOptions *chunkFilePoolOptions);

void InitConcurrentApplyOptions(common::Configuration *conf,
ConcurrentApplyOption *concurrentApplyOption);

void InitCopysetNodeOptions(common::Configuration *conf,
CopysetNodeOptions *copysetNodeOptions);

Expand Down
160 changes: 0 additions & 160 deletions src/chunkserver/concurrent_apply.cpp

This file was deleted.

Loading

0 comments on commit c425458

Please sign in to comment.