diff --git a/conf/chunkserver.conf.example b/conf/chunkserver.conf.example index a6a1595d1f..b8a339e877 100644 --- a/conf/chunkserver.conf.example +++ b/conf/chunkserver.conf.example @@ -141,11 +141,14 @@ storeng.sync_write=false # # Concurrent apply module -# -# 并发模块的并发度,一般是10 -concurrentapply.size=10 -# 并发模块线程的队列深度 -concurrentapply.queuedepth=1 +# 并发模块写线程的并发度,一般是10 +wconcurrentapply.size=10 +# 并发模块写线程的队列深度 +wconcurrentapply.queuedepth=1 +# 并发模块读线程的并发度,一般是5 +rconcurrentapply.size=5 +# 并发模块读线程的队列深度 +rconcurrentapply.queuedepth=1 # # Chunkfile pool diff --git a/coverage/test.py b/coverage/test.py deleted file mode 100755 index 7c457e795f..0000000000 --- a/coverage/test.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python - -def lls(s): - d = {} - start = 0 - ans = 0 - for i,c in enumerate(s): - print "i,c %s %s" %(i,c) - if c in d: - print "c=%s" %c - start = max(start, d[c] + 1) - print "start=%s" %start - d[c] = i - print "d = %s" %d - ans = max(ans, i - start + 1) - print "ans=%s" %ans - print d - print ans - -lls("afal") diff --git a/coverage/tmp.py b/coverage/tmp.py deleted file mode 100755 index 78bfda2a3b..0000000000 --- a/coverage/tmp.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -a = ['src/chunkserver/datastore/chunkfile_pool.cpp', 'src/fs/ext4_filesystem_impl.cpp', 'src/fs/wrap_posix.cpp'] - -a = [c.split('/', 1)[1] for c in a] -print a - - diff --git a/curve-ansible/roles/generate_config/defaults/main.yml b/curve-ansible/roles/generate_config/defaults/main.yml index 01f4145b15..5825145750 100644 --- a/curve-ansible/roles/generate_config/defaults/main.yml +++ b/curve-ansible/roles/generate_config/defaults/main.yml @@ -114,8 +114,10 @@ chunkserver_s3_config_path: /etc/curve/cs_s3.conf chunkserver_fs_enable_renameat2: true chunkserver_metric_onoff: true chunkserver_storeng_sync_write: false -chunkserver_concurrentapply_size: 10 -chunkserver_concurrentapply_queuedepth: 1 +chunkserver_wconcurrentapply_size: 10 +chunkserver_wconcurrentapply_queuedepth: 1 +chunkserver_rconcurrentapply_size: 5 +chunkserver_rconcurrentapply_queuedepth: 1 chunkserver_chunkfilepool_enable_get_chunk_from_pool: true chunkserver_chunkfilepool_chunk_file_pool_dir: ./0/ chunkserver_chunkfilepool_cpmeta_file_size: 4096 diff --git a/curve-ansible/roles/generate_config/templates/chunkserver.conf.j2 b/curve-ansible/roles/generate_config/templates/chunkserver.conf.j2 index 45cd9df750..616fa09f53 100644 --- a/curve-ansible/roles/generate_config/templates/chunkserver.conf.j2 +++ b/curve-ansible/roles/generate_config/templates/chunkserver.conf.j2 @@ -149,9 +149,13 @@ storeng.sync_write={{ chunkserver_storeng_sync_write }} # Concurrent apply module # # 并发模块的并发度,一般是10 -concurrentapply.size={{ chunkserver_concurrentapply_size }} +wconcurrentapply.size={{ chunkserver_wconcurrentapply_size }} # 并发模块线程的队列深度 -concurrentapply.queuedepth={{ chunkserver_concurrentapply_queuedepth }} +wconcurrentapply.queuedepth={{ chunkserver_wconcurrentapply_queuedepth }} +# 并发模块读线程的并发度,一般是5 +rconcurrentapply.size={{ chunkserver_rconcurrentapply_size }} +# 并发模块读线程的队列深度 +rconcurrentapply.queuedepth={{ chunkserver_rconcurrentapply_queuedepth }} # # Chunkfile pool diff --git a/deploy/local/chunkserver/conf/chunkserver.conf.0 b/deploy/local/chunkserver/conf/chunkserver.conf.0 index 771cc0a415..2c7789ab74 100644 --- a/deploy/local/chunkserver/conf/chunkserver.conf.0 +++ b/deploy/local/chunkserver/conf/chunkserver.conf.0 @@ -113,6 +113,11 @@ storeng.sync_write=false # concurrentapply.size=10 concurrentapply.queuedepth=1 +wconcurrentapply.size=10 +wconcurrentapply.queuedepth=1 +rconcurrentapply.size=5 +rconcurrentapply.queuedepth=1 + # # Chunkfile pool diff --git a/deploy/local/chunkserver/conf/chunkserver.conf.1 b/deploy/local/chunkserver/conf/chunkserver.conf.1 index 6150465c51..a59a953838 100644 --- a/deploy/local/chunkserver/conf/chunkserver.conf.1 +++ b/deploy/local/chunkserver/conf/chunkserver.conf.1 @@ -111,8 +111,10 @@ storeng.sync_write=false # # Concurrent apply module # -concurrentapply.size=10 -concurrentapply.queuedepth=1 +wconcurrentapply.size=10 +wconcurrentapply.queuedepth=1 +rconcurrentapply.size=5 +rconcurrentapply.queuedepth=1 # # Chunkfile pool diff --git a/deploy/local/chunkserver/conf/chunkserver.conf.2 b/deploy/local/chunkserver/conf/chunkserver.conf.2 index 0b4ab047b0..7fdc60f244 100644 --- a/deploy/local/chunkserver/conf/chunkserver.conf.2 +++ b/deploy/local/chunkserver/conf/chunkserver.conf.2 @@ -111,8 +111,10 @@ storeng.sync_write=false # # Concurrent apply module # -concurrentapply.size=10 -concurrentapply.queuedepth=1 +wconcurrentapply.size=10 +wconcurrentapply.queuedepth=1 +rconcurrentapply.size=5 +rconcurrentapply.queuedepth=1 # # Chunkfile pool diff --git a/src/chunkserver/BUILD b/src/chunkserver/BUILD index fdcd6408e5..8d7a2824a2 100644 --- a/src/chunkserver/BUILD +++ b/src/chunkserver/BUILD @@ -65,6 +65,7 @@ cc_library( "//proto:chunkserver-cc-protos", "//proto:topology_cc_proto", "//src/chunkserver/datastore:chunkserver_datastore", + "//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply", "//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot", "//src/common:curve_common", "//src/common:curve_s3_adapter", @@ -108,6 +109,7 @@ cc_library( "//proto:chunkserver-cc-protos", "//proto:topology_cc_proto", "//src/chunkserver/datastore:chunkserver_datastore", + "//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply", "//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot", "//src/common:curve_common", "//src/common:curve_s3_adapter", @@ -144,6 +146,7 @@ cc_binary( "//proto:chunkserver-cc-protos", "//src/chunkserver:chunkserver-lib", "//src/chunkserver/datastore:chunkserver_datastore", + "//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply", "//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot", "//src/common:curve_common", "//src/common:curve_s3_adapter", diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 03b47b5776..535664888b 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -47,6 +47,7 @@ using ::curve::fs::LocalFileSystem; using ::curve::fs::LocalFileSystemOption; using ::curve::fs::LocalFsFactory; using ::curve::fs::FileSystemType; +using ::curve::chunkserver::concurrent::ConcurrentApplyModule; DEFINE_string(conf, "ChunkServer.conf", "Path of configuration file"); DEFINE_string(chunkServerIp, "127.0.0.1", "chunkserver ip"); @@ -104,11 +105,9 @@ int ChunkServer::Run(int argc, char** argv) { // 初始化并发持久模块 ConcurrentApplyModule concurrentapply; - int size; - LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.size", &size)); - int qdepth; - LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.queuedepth", &qdepth)); - LOG_IF(FATAL, false == concurrentapply.Init(size, qdepth)) + ConcurrentApplyOption concurrentApplyOptions; + InitConcurrentApplyOptions(&conf, &concurrentApplyOptions); + LOG_IF(FATAL, false == concurrentapply.Init(concurrentApplyOptions)) << "Failed to initialize concurrentapply module!"; // 初始化本地文件系统 @@ -380,6 +379,8 @@ void ChunkServer::Stop() { brpc::AskToQuit(); } + + void ChunkServer::InitChunkFilePoolOptions( common::Configuration *conf, ChunkfilePoolOptions *chunkFilePoolOptions) { LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size", @@ -408,6 +409,18 @@ void ChunkServer::InitChunkFilePoolOptions( } } +void ChunkServer::InitConcurrentApplyOptions(common::Configuration *conf, + ConcurrentApplyOption *concurrentApplyOptions) { + LOG_IF(FATAL, !conf->GetIntValue( + "rconcurrentapply.size", &concurrentApplyOptions->rconcurrentsize)); + LOG_IF(FATAL, !conf->GetIntValue( + "wconcurrentapply.size", &concurrentApplyOptions->wconcurrentsize)); + LOG_IF(FATAL, !conf->GetIntValue( + "rconcurrentapply.queuedepth", &concurrentApplyOptions->rqueuedepth)); + LOG_IF(FATAL, !conf->GetIntValue( + "wconcurrentapply.queuedepth", &concurrentApplyOptions->wqueuedepth)); +} + void ChunkServer::InitCopysetNodeOptions( common::Configuration *conf, CopysetNodeOptions *copysetNodeOptions) { LOG_IF(FATAL, !conf->GetStringValue("global.ip", ©setNodeOptions->ip)); diff --git a/src/chunkserver/chunkserver.h b/src/chunkserver/chunkserver.h index c751e81643..19dcf56ba6 100644 --- a/src/chunkserver/chunkserver.h +++ b/src/chunkserver/chunkserver.h @@ -32,6 +32,9 @@ #include "src/chunkserver/register.h" #include "src/chunkserver/trash.h" #include "src/chunkserver/chunkserver_metrics.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" + +using ::curve::chunkserver::concurrent::ConcurrentApplyOption; namespace curve { namespace chunkserver { @@ -56,6 +59,9 @@ class ChunkServer { void InitChunkFilePoolOptions(common::Configuration *conf, ChunkfilePoolOptions *chunkFilePoolOptions); + void InitConcurrentApplyOptions(common::Configuration *conf, + ConcurrentApplyOption *concurrentApplyOption); + void InitCopysetNodeOptions(common::Configuration *conf, CopysetNodeOptions *copysetNodeOptions); diff --git a/src/chunkserver/concurrent_apply.cpp b/src/chunkserver/concurrent_apply.cpp deleted file mode 100644 index 65ee210154..0000000000 --- a/src/chunkserver/concurrent_apply.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2020 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: curve - * File Created: Tuesday, 11th December 2018 6:26:14 pm - * Author: tongguangxun - */ - -#include - -#include -#include "src/chunkserver/concurrent_apply.h" - -namespace curve { -namespace chunkserver { - -#define DEFAULT_CONCURRENT_SIZE 10 -#define DEFAULT_QUEUEDEPTH 1 - -ConcurrentApplyModule::ConcurrentApplyModule(): - stop_(0), - isStarted_(false), - concurrentsize_(0), - queuedepth_(0), - cond_(0) { - applypoolMap_.clear(); -} - -ConcurrentApplyModule::~ConcurrentApplyModule() { -} - -bool ConcurrentApplyModule::Init(int concurrentsize, int queuedepth) { - if (isStarted_) { - LOG(WARNING) << "concurrent module already start!"; - return true; - } - - if (concurrentsize <= 0) { - concurrentsize_ = DEFAULT_CONCURRENT_SIZE; - } else { - concurrentsize_ = concurrentsize; - } - - if (queuedepth <= 0) { - queuedepth_ = DEFAULT_QUEUEDEPTH; - } else { - queuedepth_ = queuedepth; - } - - // 等待event事件数,等于线程数 - cond_.Reset(concurrentsize); - - /** - * 因为hash map并不是线程安全的,所以必须先将applyPoolMap_创建好 - * 然后插入所有元素之后,之后才能创建线程,这样对applypoolMap_的 - * read/write就不可能出现并发 - */ - for (int i = 0; i < concurrentsize_; i++) { - auto asyncth = new (std::nothrow) taskthread(queuedepth_); - CHECK(asyncth != nullptr) << "allocate failed!"; - applypoolMap_.insert(std::make_pair(i, asyncth)); - } - - for (int i = 0; i < concurrentsize_; i++) { - applypoolMap_[i]->th = std::move(std::thread(&ConcurrentApplyModule::Run, this, i)); // NOLINT - } - - /** - * 等待所有线程创建完成,默认等待5秒,后台线程还没有全部创建成功, - * 那么可以认为系统或者程序出现了问题,可以判定这次init失败了,直接退出 - */ - if (cond_.WaitFor(5000)) { - isStarted_ = true; - } else { - LOG(ERROR) << "init concurrent module's threads fail"; - isStarted_ = false; - } - - return isStarted_; -} - -void ConcurrentApplyModule::Run(int index) { - cond_.Signal(); - while (!stop_) { - auto t = applypoolMap_[index]->tq.Pop(); - t(); - } -} - -void ConcurrentApplyModule::Stop() { - LOG(INFO) << "stop ConcurrentApplyModule..."; - stop_ = true; - auto wakeup = []() {}; - for (auto iter : applypoolMap_) { - iter.second->tq.Push(wakeup); - iter.second->th.join(); - delete iter.second; - } - applypoolMap_.clear(); - - isStarted_ = false; - LOG(INFO) << "stop ConcurrentApplyModule ok."; -} - -void ConcurrentApplyModule::Flush() { - if (!isStarted_) { - LOG(WARNING) << "concurrent module not start!"; - return; - } - - std::atomic* signal = new (std::nothrow) std::atomic[concurrentsize_]; //NOLINT - std::mutex* mtx = new (std::nothrow) std::mutex[concurrentsize_]; - std::condition_variable* cv= new (std::nothrow) std::condition_variable[concurrentsize_]; //NOLINT - CHECK(signal != nullptr && mtx != nullptr && cv != nullptr) - << "allocate buffer failed!"; - - for (int i = 0; i < concurrentsize_; i++) { - signal[i].store(false); - } - - auto flushtask = [&mtx, &signal, &cv](int i) { - std::unique_lock lk(mtx[i]); - signal[i].store(true); - cv[i].notify_one(); - }; - - auto flushwait = [&mtx, &signal, &cv](int i) { - std::unique_lock lk(mtx[i]); - cv[i].wait(lk, [&]()->bool{return signal[i].load();}); - }; - - for (int i = 0; i < concurrentsize_; i++) { - applypoolMap_[i]->tq.Push(flushtask, i); - } - - for (int i = 0; i < concurrentsize_; i++) { - flushwait(i); - } - - delete[] signal; - delete[] mtx; - delete[] cv; -} - -} // namespace chunkserver -} // namespace curve diff --git a/src/chunkserver/concurrent_apply.h b/src/chunkserver/concurrent_apply.h deleted file mode 100644 index 5d3b3518e6..0000000000 --- a/src/chunkserver/concurrent_apply.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2020 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: curve - * File Created: Tuesday, 11th December 2018 6:26:24 pm - * Author: tongguangxun - */ - -#ifndef SRC_CHUNKSERVER_CONCURRENT_APPLY_H_ -#define SRC_CHUNKSERVER_CONCURRENT_APPLY_H_ - -#include -#include -#include -#include // NOLINT -#include // NOLINT -#include -#include -#include // NOLINT - -#include "src/common/concurrent/task_queue.h" -#include "include/curve_compiler_specific.h" -#include "src/common/concurrent/count_down_event.h" - -using curve::common::TaskQueue; -using curve::common::CountDownEvent; -namespace curve { -namespace chunkserver { - -class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { - public: - ConcurrentApplyModule(); - ~ConcurrentApplyModule(); - /** - * @param: concurrentsize是当前并发模块的并发大小 - * @param: queuedepth是当前并发模块每个队列的深度控制 - */ - bool Init(int concurrentsize, int queuedepth); - - /** - * raft apply线程会将task push到后台队列 - * @param: key用于将task哈希到指定队列 - * @param: f为要执行的task - * @param: args为执行task的参数 - */ - template - bool Push(uint64_t key, F&& f, Args&&... args) { - if (!isStarted_) { - LOG(WARNING) << "concurrent module not start!"; - return false; - } - - auto task = std::bind(std::forward(f), std::forward(args)...); - applypoolMap_[Hash(key)]->tq.Push(task); - return true; - }; // NOLINT - - // raft snapshot之前需要将队列中的IO全部落盘。 - void Flush(); - void Stop(); - - private: - void Run(int index); - inline int Hash(uint64_t key) { - return key % concurrentsize_; - } - - private: - typedef uint8_t threadIndex; - typedef struct taskthread { - std::thread th; - TaskQueue tq; - taskthread(size_t capacity):tq(capacity) {} - ~taskthread() = default; - } taskthread_t; - - // 常规的stop和start控制变量 - bool stop_; - bool isStarted_; - // 每个队列的深度 - int queuedepth_; - // 并发度 - int concurrentsize_; - // 用于统一启动后台线程完全创建完成的条件变量 - CountDownEvent cond_; - // 存储threadindex与taskthread的映射关系 - CURVE_CACHELINE_ALIGNMENT std::unordered_map applypoolMap_; // NOLINT -}; -} // namespace chunkserver -} // namespace curve - -#endif // SRC_CHUNKSERVER_CONCURRENT_APPLY_H_ diff --git a/src/chunkserver/concurrent_apply/BUILD b/src/chunkserver/concurrent_apply/BUILD new file mode 100644 index 0000000000..809fa68bab --- /dev/null +++ b/src/chunkserver/concurrent_apply/BUILD @@ -0,0 +1,33 @@ +# +# Copyright (c) 2020 NetEase Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cc_library( + name = "chunkserver_concurrent_apply", + srcs = glob([ + "*.cpp", + "*.h" + ]), + linkopts = ([ + "-pthread", + "-std=c++11", + ]), + visibility = ["//visibility:public"], + deps = [ + "//external:glog", + "//src/common:curve_common", + "//proto:chunkserver-cc-protos" + ], +) diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.cpp b/src/chunkserver/concurrent_apply/concurrent_apply.cpp new file mode 100644 index 0000000000..374879b1ab --- /dev/null +++ b/src/chunkserver/concurrent_apply/concurrent_apply.cpp @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * File Created: 20200813 + * Author: lixiaocui + */ + +#include + +#include +#include +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" +#include "src/common/concurrent/count_down_event.h" + +using ::curve::common::CountDownEvent; + +namespace curve { +namespace chunkserver { +namespace concurrent { +bool ConcurrentApplyModule::Init(const ConcurrentApplyOption &opt) { + if (start_) { + LOG(WARNING) << "concurrent module already start!"; + return true; + } + + if (false == checkOptAndInit(opt)) { + return false; + } + + start_ = true; + cond_.Reset(opt.rconcurrentsize + opt.wconcurrentsize); + InitThreadPool(ThreadPoolType::READ, rconcurrentsize_, rqueuedepth_); + InitThreadPool(ThreadPoolType::WRITE, wconcurrentsize_, wqueuedepth_); + + if (!cond_.WaitFor(5000)) { + LOG(ERROR) << "init concurrent module's threads fail"; + start_ = false; + } + + LOG(INFO) << "Init concurrent module's threads success"; + return start_; +} + +bool ConcurrentApplyModule::checkOptAndInit( + const ConcurrentApplyOption &opt) { + if (opt.rconcurrentsize <= 0 || opt.wconcurrentsize <= 0 || + opt.rqueuedepth <= 0 || opt.wqueuedepth <= 0) { + LOG(INFO) << "init concurrent module fail, params must >=0" + << ", rconcurrentsize=" << opt.rconcurrentsize + << ", wconcurrentsize=" << opt.wconcurrentsize + << ", rqueuedepth=" << opt.rqueuedepth + << ", wconcurrentsize=" << opt.wqueuedepth; + return false; + } + + wconcurrentsize_ = opt.wconcurrentsize; + wqueuedepth_ = opt.wqueuedepth; + rconcurrentsize_ = opt.rconcurrentsize; + rqueuedepth_ = opt.rqueuedepth; + + return true; +} + + +void ConcurrentApplyModule::InitThreadPool( + ThreadPoolType type, int concorrent, int depth) { + for (int i = 0; i < concorrent; i++) { + auto asyncth = new (std::nothrow) taskthread(depth); + CHECK(asyncth != nullptr) << "allocate failed!"; + + switch (type) { + case ThreadPoolType::READ: + rapplyMap_.insert(std::make_pair(i, asyncth)); + break; + + case ThreadPoolType::WRITE: + wapplyMap_.insert(std::make_pair(i, asyncth)); + break; + } + } + + for (int i = 0; i < concorrent; i++) { + switch (type) { + case ThreadPoolType::READ: + rapplyMap_[i]->th = std::move( + std::thread(&ConcurrentApplyModule::Run, this, type, i)); + break; + + case ThreadPoolType::WRITE: + wapplyMap_[i]->th = + std::thread(&ConcurrentApplyModule::Run, this, type, i); + break; + } + } +} + +void ConcurrentApplyModule::Run(ThreadPoolType type, int index) { + cond_.Signal(); + while (start_) { + switch (type) { + case ThreadPoolType::READ: + rapplyMap_[index]->tq.Pop()(); + break; + + case ThreadPoolType::WRITE: + wapplyMap_[index]->tq.Pop()(); + break; + } + } +} + +void ConcurrentApplyModule::Stop() { + LOG(INFO) << "stop ConcurrentApplyModule..."; + start_ = false; + auto wakeup = []() {}; + for (auto iter : rapplyMap_) { + iter.second->tq.Push(wakeup); + iter.second->th.join(); + delete iter.second; + } + rapplyMap_.clear(); + + for (auto iter : wapplyMap_) { + iter.second->tq.Push(wakeup); + iter.second->th.join(); + delete iter.second; + } + wapplyMap_.clear(); + + LOG(INFO) << "stop ConcurrentApplyModule ok."; +} + +void ConcurrentApplyModule::Flush() { + CountDownEvent event(wconcurrentsize_); + auto flushtask = [&event]() { + event.Signal(); + }; + + for (int i = 0; i < wconcurrentsize_; i++) { + wapplyMap_[i]->tq.Push(flushtask); + } + + event.Wait(); +} + +ThreadPoolType ConcurrentApplyModule::Schedule(CHUNK_OP_TYPE optype) { + switch (optype) { + case CHUNK_OP_READ: + case CHUNK_OP_RECOVER: + return ThreadPoolType::READ; + default: + return ThreadPoolType::WRITE; + } +} +} // namespace concurrent +} // namespace chunkserver +} // namespace curve diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.h b/src/chunkserver/concurrent_apply/concurrent_apply.h new file mode 100644 index 0000000000..fba499bfa3 --- /dev/null +++ b/src/chunkserver/concurrent_apply/concurrent_apply.h @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * File Created: 20200813 + * Author: lixiaocui + */ + +#ifndef SRC_CHUNKSERVER_CONCURRENT_APPLY_CONCURRENT_APPLY_H_ +#define SRC_CHUNKSERVER_CONCURRENT_APPLY_CONCURRENT_APPLY_H_ + +#include +#include +#include +#include // NOLINT +#include // NOLINT +#include +#include +#include // NOLINT + +#include "src/common/concurrent/task_queue.h" +#include "src/common/concurrent/count_down_event.h" +#include "proto/chunk.pb.h" +#include "include/curve_compiler_specific.h" + +using curve::common::TaskQueue; +using curve::common::CountDownEvent; +using curve::chunkserver::CHUNK_OP_TYPE; + +namespace curve { +namespace chunkserver { +namespace concurrent { + +struct ConcurrentApplyOption { + int wconcurrentsize; + int wqueuedepth; + int rconcurrentsize; + int rqueuedepth; +}; + +enum class ThreadPoolType {READ, WRITE}; + +class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { + public: + ConcurrentApplyModule(): start_(false), + rconcurrentsize_(0), + wconcurrentsize_(0), + rqueuedepth_(0), + wqueuedepth_(0), + cond_(0) {} + ~ConcurrentApplyModule() {} + + /** + * Init: initialize ConcurrentApplyModule + * @param[in] wconcurrentsize: num of write threads + * @param[in] wqueuedepth: depth of write queue in ervery thread + * @param[in] rconcurrentsizee: num of read threads + * @param[in] wqueuedephth: depth of read queue in every thread + */ + bool Init(const ConcurrentApplyOption &opt); + + /** + * Push: apply task will be push to ConcurrentApplyModule + * @param[in] key: used to hash task to specified queue + * @param[in] optype: operation type defined in proto + * @param[in] f: task + * @param[in] args: param to excute task + */ + template + bool Push(uint64_t key, CHUNK_OP_TYPE optype, F&& f, Args&&... args) { + auto task = std::bind(std::forward(f), std::forward(args)...); + switch (Schedule(optype)) { + case ThreadPoolType::READ: + rapplyMap_[Hash(key, rconcurrentsize_)]->tq.Push(task); + break; + case ThreadPoolType::WRITE: + wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push(task); + break; + } + + return true; + } + + /** + * Flush: finish all task in write threads + */ + void Flush(); + + void Stop(); + + private: + bool checkOptAndInit(const ConcurrentApplyOption &option); + + void Run(ThreadPoolType type, int index); + + ThreadPoolType Schedule(CHUNK_OP_TYPE optype); + + void InitThreadPool(ThreadPoolType type, int concorrent, int depth); + + int Hash(uint64_t key, int concurrent) { + return key % concurrent; + } + + private: + typedef uint8_t threadIndex; + typedef struct taskthread { + std::thread th; + TaskQueue tq; + taskthread(size_t capacity):tq(capacity) {} + ~taskthread() = default; + } taskthread_t; + + bool start_; + int rconcurrentsize_; + int rqueuedepth_; + int wconcurrentsize_; + int wqueuedepth_; + CountDownEvent cond_; + CURVE_CACHELINE_ALIGNMENT std::unordered_map wapplyMap_; // NOLINT + CURVE_CACHELINE_ALIGNMENT std::unordered_map rapplyMap_; // NOLINT +}; +} // namespace concurrent +} // namespace chunkserver +} // namespace curve + +#endif // SRC_CHUNKSERVER_CONCURRENT_APPLY_CONCURRENT_APPLY_H_ diff --git a/src/chunkserver/config_info.cpp b/src/chunkserver/config_info.cpp index b1caf1e7cf..e29d17d55a 100644 --- a/src/chunkserver/config_info.cpp +++ b/src/chunkserver/config_info.cpp @@ -24,7 +24,7 @@ #include "src/chunkserver/copyset_node_manager.h" #include "src/chunkserver/datastore/chunkfile_pool.h" -#include "src/chunkserver/concurrent_apply.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" namespace curve { namespace chunkserver { diff --git a/src/chunkserver/config_info.h b/src/chunkserver/config_info.h index b53890135a..5011a95109 100644 --- a/src/chunkserver/config_info.h +++ b/src/chunkserver/config_info.h @@ -29,14 +29,15 @@ #include "src/fs/local_filesystem.h" #include "src/chunkserver/trash.h" #include "src/chunkserver/inflight_throttle.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" #include "include/chunkserver/chunkserver_common.h" namespace curve { namespace chunkserver { using curve::fs::LocalFileSystem; +using curve::chunkserver::concurrent::ConcurrentApplyModule; -class ConcurrentApplyModule; class ChunkfilePool; class CopysetNodeManager; class CloneManager; diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index b6ab532891..4d3768b048 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -242,7 +242,8 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { opRequest, iter.index(), doneGuard.release()); - concurrentapply_->Push(opRequest->ChunkId(), task); + concurrentapply_->Push( + opRequest->ChunkId(), opRequest->OpType(), task); } else { // 获取log entry butil::IOBuf log = iter.data(); @@ -261,7 +262,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { dataStore_, std::move(request), data); - concurrentapply_->Push(chunkId, task); + concurrentapply_->Push(chunkId, request.optype(), task); } } } diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index 94dee23ed2..6b15db68c6 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -30,7 +30,7 @@ #include #include -#include "src/chunkserver/concurrent_apply.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/conf_epoch_file.h" #include "src/chunkserver/config_info.h" diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index fdf26a9085..ba5cafe74c 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -283,7 +283,8 @@ void ReadChunkRequest::Process() { thisPtr, node_->GetAppliedIndex(), doneGuard.release()); - concurrentApplyModule_->Push(request_->chunkid(), task); + concurrentApplyModule_->Push( + request_->chunkid(), request_->optype(), task); return; } diff --git a/src/chunkserver/op_request.h b/src/chunkserver/op_request.h index d1b81b9235..fb9dfddb5c 100755 --- a/src/chunkserver/op_request.h +++ b/src/chunkserver/op_request.h @@ -31,15 +31,15 @@ #include "proto/chunk.pb.h" #include "include/chunkserver/chunkserver_common.h" -#include "src/chunkserver/concurrent_apply.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" #include "src/chunkserver/datastore/define.h" +using ::google::protobuf::RpcController; +using ::curve::chunkserver::concurrent::ConcurrentApplyModule; + namespace curve { namespace chunkserver { -using ::google::protobuf::RpcController; -using curve::chunkserver::CSChunkInfo; - class CopysetNode; class CSDataStore; class CloneManager; diff --git a/src/fs/ext4_filesystem_impl.cpp b/src/fs/ext4_filesystem_impl.cpp index a91bf86920..a702f1d65b 100644 --- a/src/fs/ext4_filesystem_impl.cpp +++ b/src/fs/ext4_filesystem_impl.cpp @@ -249,7 +249,7 @@ int Ext4FileSystemImpl::List(const string& dirName, vector *names) { DIR *dir = posixWrapper_->opendir(dirName.c_str()); if (nullptr == dir) { - LOG(WARNING) << "opendir failed: " << strerror(errno); + LOG(WARNING) << "opendir:" << dirName << " failed:" << strerror(errno); return -errno; } struct dirent *dirIter; diff --git a/test/chunkserver/chunkserver_test_util.cpp b/test/chunkserver/chunkserver_test_util.cpp index 355597551f..b32cbc15bd 100644 --- a/test/chunkserver/chunkserver_test_util.cpp +++ b/test/chunkserver/chunkserver_test_util.cpp @@ -41,6 +41,9 @@ #include "src/chunkserver/cli.h" #include "test/chunkserver/fake_datastore.h" #include "src/chunkserver/uri_paser.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" + +using ::curve::chunkserver::concurrent::ConcurrentApplyOption; namespace curve { namespace chunkserver { @@ -156,7 +159,8 @@ int StartChunkserver(const char *ip, LOG(INFO) << "chunfilepool init success"; } - LOG_IF(FATAL, false == copysetNodeOptions.concurrentapply->Init(2, 1)) + ConcurrentApplyOption opt{2, 1, 2, 1}; + LOG_IF(FATAL, false == copysetNodeOptions.concurrentapply->Init(opt)) << "Failed to init concurrent apply module"; Configuration conf; diff --git a/test/chunkserver/concurrent_apply/BUILD b/test/chunkserver/concurrent_apply/BUILD new file mode 100644 index 0000000000..3607c32e81 --- /dev/null +++ b/test/chunkserver/concurrent_apply/BUILD @@ -0,0 +1,29 @@ +# +# Copyright (c) 2020 NetEase Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cc_test( + name = "concurrent_apply_test", + srcs = [ + "concurrent_apply_test.cpp", + ], + includes = ([]), + copts = ["-std=c++11"], + deps = [ + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + "//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply", + ], +) diff --git a/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp b/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp new file mode 100644 index 0000000000..d12b6b6391 --- /dev/null +++ b/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * File Created: 20200817 + * Author: lixiaocui + */ + +#include + +#include +#include + +#include "proto/chunk.pb.h" +#include "src/common/timeutility.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" + +using curve::chunkserver::concurrent::ConcurrentApplyModule; +using curve::chunkserver::concurrent::ConcurrentApplyOption; +using curve::chunkserver::CHUNK_OP_TYPE; + +TEST(ConcurrentApplyModule, InitTest) { + ConcurrentApplyModule concurrentapply; + + { + // 1. init with invalid write-concurrentsize + ConcurrentApplyOption opt{-1, 1, 1, 1}; + ASSERT_FALSE(concurrentapply.Init(opt)); + } + + { + // 2. init with invalid write-concurrentdepth + ConcurrentApplyOption opt{1, -1, 1, 1}; + ASSERT_FALSE(concurrentapply.Init(opt)); + } + + { + // 3. init with invalid read-concurrentsize + ConcurrentApplyOption opt{1, 1, -1, 1}; + ASSERT_FALSE(concurrentapply.Init(opt)); + } + + { + // 4. init with invalid read-concurrentdepth + ConcurrentApplyOption opt{1, 1, 1, -1}; + ASSERT_FALSE(concurrentapply.Init(opt)); + } + + { + // 5. double init + ConcurrentApplyOption opt{1, 1, 1, 1}; + // 5. init with vaild params + ASSERT_TRUE(concurrentapply.Init(opt)); + } + + concurrentapply.Stop(); +} + + +TEST(ConcurrentApplyModule, RunTest) { + ConcurrentApplyModule concurrentapply; + + int testw = 0; + auto wtask = [&testw]() { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + testw++; + }; + + int testr = 0; + auto rtask = [&testr]() { + testr++; + }; + + // push write and read tasks + ConcurrentApplyOption opt{1, 1, 1, 1}; + ASSERT_TRUE(concurrentapply.Init(opt)); + + ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_READ, rtask)); + ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_WRITE, wtask)); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_EQ(1, testr); + ASSERT_EQ(0, testw); + concurrentapply.Flush(); + ASSERT_EQ(1, testw); + + concurrentapply.Stop(); +} + +TEST(ConcurrentApplyModule, FlushTest) { + ConcurrentApplyModule concurrentapply; + ConcurrentApplyOption opt{2, 5000, 1, 1}; + ASSERT_TRUE(concurrentapply.Init(opt)); + + std::atomic testnum(0); + auto task = [&testnum]() { + testnum.fetch_add(1); + }; + + for (int i = 0; i < 5000; i++) { + concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task); + } + + ASSERT_LT(testnum, 5000); + concurrentapply.Flush(); + ASSERT_EQ(5000, testnum); + + concurrentapply.Stop(); +} + +TEST(ConcurrentApplyModule, ConcurrentTest) { + // interval flush when push + std::atomic stop(false); + std::atomic testnum(0); + ConcurrentApplyModule concurrentapply; + ConcurrentApplyOption opt{10, 1, 5, 2}; + ASSERT_TRUE(concurrentapply.Init(opt)); + + auto push = [&concurrentapply, &stop, &testnum]() { + auto task = [&testnum]() { + testnum.fetch_add(1); + }; + while (!stop.load()) { + for (int i = 0; i < 10; i++) { + concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_RECOVER, task); + concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task); + } + } + }; + + auto flush = [&concurrentapply, &stop, &testnum]() { + while (!stop.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + concurrentapply.Flush(); + } + }; + + std::thread t(push); + std::thread f(flush); + + while (testnum.load() <= 1000000) { + } + + stop.store(true); + + std::cout << "thread exit, join" << std::endl; + t.join(); + f.join(); + + concurrentapply.Flush(); + ASSERT_GT(testnum, 1000000); + concurrentapply.Stop(); +} + diff --git a/test/chunkserver/concurrent_apply_unittest.cpp b/test/chunkserver/concurrent_apply_unittest.cpp deleted file mode 100644 index 0c4bcb092e..0000000000 --- a/test/chunkserver/concurrent_apply_unittest.cpp +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2020 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: curve - * File Created: Monday, 24th December 2018 9:57:24 am - * Author: tongguangxun - */ - -#include - -#include -#include - -#include "src/common/timeutility.h" -#include "src/chunkserver/concurrent_apply.h" - -using curve::chunkserver::ConcurrentApplyModule; - -TEST(ConcurrentApplyModule, ConcurrentApplyModuleInitTest) { - /** - * init twice test, exptect init success - */ - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(-1, 1)); - ASSERT_TRUE(concurrentapply.Init(1, -1)); - ASSERT_TRUE(concurrentapply.Init(-1, -1)); - ASSERT_TRUE(concurrentapply.Init(1, 1)); - ASSERT_TRUE(concurrentapply.Init(1, 1)); - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, ConcurrentApplyModuleRunTest) { - /** - * worker run the task correctly - */ - - ConcurrentApplyModule concurrentapply; - int testnum = 0; - auto runtask = [&testnum]() { - testnum++; - }; - ASSERT_FALSE(concurrentapply.Push(1, runtask)); - ASSERT_TRUE(concurrentapply.Init(2, 1)); - concurrentapply.Push(0, runtask); - concurrentapply.Flush(); - ASSERT_EQ(1, testnum); - concurrentapply.Push(1, runtask); - concurrentapply.Flush(); - ASSERT_EQ(2, testnum); - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, ConcurrentApplyModuleFlushTest) { - /** - * test flush interface will flush all undo task before return - */ - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(2, 10000)); - std::atomic testnum(0); - auto runtask = [&testnum]() { - testnum.fetch_add(1); - }; - - for (int i = 0; i < 5000; i++) { - concurrentapply.Push(i, runtask); - concurrentapply.Push(i + 1, runtask); - } - - concurrentapply.Flush(); - ASSERT_EQ(10000, testnum); - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, ConcurrentApplyModuleFlushConcurrentTest) { - /** - * test flush interface in concurrent condition - */ - std::atomic stop(false); - std::atomic testnum(0); - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(10, 1)); - auto testfunc = [&concurrentapply, &stop, &testnum]() { - auto runtask = [&testnum]() { - testnum.fetch_add(1); - }; - while (!stop.load()) { - for (int i = 0; i < 10; i++) { - concurrentapply.Push(i, runtask); - } - } - }; - - auto flushtest = [&concurrentapply, &stop, &testnum]() { - while (!stop.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - concurrentapply.Flush(); - } - }; - - std::thread t(testfunc); - std::thread f(flushtest); - - while (testnum.load() <= 1000000) { - } - - stop.store(true); - - std::cout << "thread exit, join" << std::endl; - t.join(); - f.join(); - - concurrentapply.Flush(); - ASSERT_GT(testnum, 1000000); - concurrentapply.Stop(); -} - -// ci暫時不跑性能测试 -#if 0 -TEST(ConcurrentApplyModule, MultiCopysetPerformanceTest) { - std::atomic start(false); - std::atomic donecount(0); - - std::mutex mtx; - std::condition_variable startcv; - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(10, 1)); - auto testfunc = [&concurrentapply, &startcv, &start, &mtx, &donecount]() { - { - std::unique_lock lk(mtx); - startcv.wait(lk, [&start](){return start.load() == true;}); - } - auto runtask = []() {}; - uint32_t count = 0; - while (count < 1000000) { - concurrentapply.Push(count%10, runtask); - count++; - } - donecount.fetch_add(1); - }; - std::vector threadvec; - - for (int i = 0; i < 10; i++) { - threadvec.emplace_back(std::thread(testfunc)); - } - - uint64_t startime = curve::common::TimeUtility::GetTimeofDayUs(); - - start.store(true); - startcv.notify_all(); - - while (donecount.load() < 10) {} - uint64_t stoptime = curve::common::TimeUtility::GetTimeofDayUs(); - - for (auto& iter : threadvec) { - if (iter.joinable()) { - iter.join(); - } - } - - float usetime = (stoptime - startime)/1000000.0; - std::cout << "multi copyet multi chunk concurrent module tps = " - << 10000000/usetime - << std::endl; - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, SingleCopysetPerformanceTest) { - std::atomic start(false); - std::atomic donecount(0); - - std::mutex mtx; - std::condition_variable startcv; - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(10, 1)); - auto testfunc = [&concurrentapply, &startcv, &start, &mtx, &donecount]() { - { - std::unique_lock lk(mtx); - startcv.wait(lk, [&start](){return start.load() == true;}); - } - auto runtask = []() {}; - uint32_t count = 0; - while (count < 1000000) { - concurrentapply.Push(count%10, runtask); - count++; - } - donecount.fetch_add(1); - }; - std::vector threadvec; - - for (int i = 0; i < 1; i++) { - threadvec.emplace_back(std::thread(testfunc)); - } - - uint64_t startime = curve::common::TimeUtility::GetTimeofDayUs(); - - start.store(true); - startcv.notify_all(); - - while (donecount.load() < 1) {} - uint64_t stoptime = curve::common::TimeUtility::GetTimeofDayUs(); - - for (auto& iter : threadvec) { - if (iter.joinable()) { - iter.join(); - } - } - - float usetime = (stoptime - startime)/1000000.0; - std::cout << "single copyet multi chunk concurrent module tps = " - << 1000000/usetime - << std::endl; - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, SingleCopysetSingleChunkPerformanceTest) { - std::atomic start(false); - std::atomic donecount(0); - - std::mutex mtx; - std::condition_variable startcv; - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(10, 1)); - auto testfunc = [&concurrentapply, &startcv, &start, &mtx, &donecount]() { - { - std::unique_lock lk(mtx); - startcv.wait(lk, [&start](){return start.load() == true;}); - } - auto runtask = []() {}; - uint32_t count = 0; - while (count < 1000000) { - concurrentapply.Push(1, runtask); - count++; - } - donecount.fetch_add(1); - }; - std::vector threadvec; - - for (int i = 0; i < 1; i++) { - threadvec.emplace_back(std::thread(testfunc)); - } - - uint64_t startime = curve::common::TimeUtility::GetTimeofDayUs(); - - start.store(true); - startcv.notify_all(); - - while (donecount.load() < 1) {} - uint64_t stoptime = curve::common::TimeUtility::GetTimeofDayUs(); - - for (auto& iter : threadvec) { - if (iter.joinable()) { - iter.join(); - } - } - - float usetime = (stoptime - startime)/1000000.0; - std::cout << "single copyet single chunk concurrent module tps = " - << 1000000/usetime - << std::endl; - concurrentapply.Stop(); -} - -TEST(ConcurrentApplyModule, MultiCopysetSingleQueuePerformanceTest) { - std::atomic start(false); - std::atomic donecount(0); - - std::mutex mtx; - std::condition_variable startcv; - - ConcurrentApplyModule concurrentapply; - ASSERT_TRUE(concurrentapply.Init(10, 1)); - auto testfunc = [&concurrentapply, &startcv, &start, &mtx, &donecount]() { - { - std::unique_lock lk(mtx); - startcv.wait(lk, [&start](){return start.load() == true;}); - } - auto runtask = []() {}; - uint32_t count = 0; - while (count < 1000000) { - concurrentapply.Push(1, runtask); - count++; - } - donecount.fetch_add(1); - }; - std::vector threadvec; - - for (int i = 0; i < 10; i++) { - threadvec.emplace_back(std::thread(testfunc)); - } - - uint64_t startime = curve::common::TimeUtility::GetTimeofDayUs(); - - start.store(true); - startcv.notify_all(); - - while (donecount.load() < 10) {} - uint64_t stoptime = curve::common::TimeUtility::GetTimeofDayUs(); - - for (auto& iter : threadvec) { - if (iter.joinable()) { - iter.join(); - } - } - - float usetime = (stoptime - startime)/1000000.0; - std::cout << "multi copyet single queue concurrent module tps = " - << 10000000/usetime - << std::endl; - concurrentapply.Stop(); -} -#endif diff --git a/test/chunkserver/copyset_node_test.cpp b/test/chunkserver/copyset_node_test.cpp index a020e34ac8..5460a8f07c 100644 --- a/test/chunkserver/copyset_node_test.cpp +++ b/test/chunkserver/copyset_node_test.cpp @@ -41,6 +41,7 @@ #include "proto/heartbeat.pb.h" #include "src/chunkserver/raftsnapshot/curve_snapshot_attachment.h" #include "test/chunkserver/mock_curve_filesystem_adaptor.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" namespace curve { namespace chunkserver { @@ -59,6 +60,7 @@ using ::testing::SaveArgPointee; using curve::fs::MockLocalFileSystem; using curve::fs::FileSystemType; using curve::fs::MockLocalFileSystem; +using curve::chunkserver::concurrent::ConcurrentApplyOption; const char copysetUri[] = "local://./copyset_node_test"; const int port = 9044; @@ -132,7 +134,8 @@ class CopysetNodeTest : public ::testing::Test { defaultOptions_.finishLoadMargin = 1000; defaultOptions_.concurrentapply = &concurrentModule_; - defaultOptions_.concurrentapply->Init(2, 1); + ConcurrentApplyOption opt{2, 1, 2, 1}; + defaultOptions_.concurrentapply->Init(opt); std::shared_ptr fs = LocalFsFactory::CreateFs(FileSystemType::EXT4, ""); ASSERT_TRUE(nullptr != fs); diff --git a/test/chunkserver/copyset_service_test.cpp b/test/chunkserver/copyset_service_test.cpp index a267184d99..f7fb690d25 100644 --- a/test/chunkserver/copyset_service_test.cpp +++ b/test/chunkserver/copyset_service_test.cpp @@ -56,11 +56,21 @@ static std::string Exec(const char *cmd) { class CopysetServiceTest : public testing::Test { public: void SetUp() { - Exec("rm -rf data"); + testDir = "CopysetServiceTestData"; + rmCmd = "rm -rf CopysetServiceTestData"; + copysetDir = "local://./CopysetServiceTestData"; + copysetDirPattern = "local://./CopysetServiceTestData/%d"; + Exec(rmCmd.c_str()); } void TearDown() { - Exec("rm -rf data"); + Exec(rmCmd.c_str()); } + + protected: + std::string testDir; + std::string rmCmd; + std::string copysetDir; + std::string copysetDirPattern; }; butil::AtExitManager atExitManager; @@ -71,7 +81,6 @@ TEST_F(CopysetServiceTest, basic) { CopysetID copysetId = 100002; std::string ip = "127.0.0.1"; uint32_t port = 9040; - std::string copysetDir = "local://./data"; brpc::Server server; butil::EndPoint addr(butil::IP_ANY, port); @@ -82,7 +91,7 @@ TEST_F(CopysetServiceTest, basic) { std::shared_ptr fs(LocalFsFactory::CreateFs(FileSystemType::EXT4, "")); //NOLINT ASSERT_TRUE(nullptr != fs); - butil::string_printf(©setDir, "local://./data/%d", port); + butil::string_printf(©setDir, copysetDirPattern.c_str(), port); CopysetNodeOptions copysetNodeOptions; copysetNodeOptions.ip = ip; copysetNodeOptions.port = port; @@ -177,7 +186,6 @@ TEST_F(CopysetServiceTest, basic2) { CopysetID copysetId = 100003; std::string ip = "127.0.0.1"; uint32_t port = 9040; - std::string copysetDir = "local://./data"; brpc::Server server; butil::EndPoint addr(butil::IP_ANY, port); @@ -188,7 +196,7 @@ TEST_F(CopysetServiceTest, basic2) { std::shared_ptr fs(LocalFsFactory::CreateFs(FileSystemType::EXT4, "")); //NOLINT ASSERT_TRUE(nullptr != fs); - butil::string_printf(©setDir, "local://./data/%d", port); + butil::string_printf(©setDir, copysetDirPattern.c_str(), port); CopysetNodeOptions copysetNodeOptions; copysetNodeOptions.ip = ip; copysetNodeOptions.port = port; diff --git a/test/chunkserver/server.cpp b/test/chunkserver/server.cpp index 980091cbcf..23907e6401 100644 --- a/test/chunkserver/server.cpp +++ b/test/chunkserver/server.cpp @@ -30,7 +30,7 @@ #include "src/chunkserver/chunk_service.h" #include "src/fs/fs_common.h" #include "src/fs/local_filesystem.h" -#include "src/chunkserver/concurrent_apply.h" +#include "src/chunkserver/concurrent_apply/concurrent_apply.h" #include "src/chunkserver/datastore/chunkfile_pool.h" #include "src/chunkserver/uri_paser.h" #include "src/chunkserver/raftsnapshot/curve_snapshot_storage.h" @@ -40,7 +40,8 @@ using curve::chunkserver::Configuration; using curve::chunkserver::CopysetNodeManager; using curve::chunkserver::ChunkfilePool; using curve::chunkserver::ChunkfilePoolOptions; -using curve::chunkserver::ConcurrentApplyModule; +using curve::chunkserver::concurrent::ConcurrentApplyModule; +using curve::chunkserver::concurrent::ConcurrentApplyOption; using curve::chunkserver::UriParser; using curve::chunkserver::LogicPoolID; using curve::chunkserver::CopysetID; @@ -192,7 +193,8 @@ int main(int argc, char *argv[]) { LOG(INFO) << "chunfilepool init success"; } - LOG_IF(FATAL, false == copysetNodeOptions.concurrentapply->Init(2, 1)) + ConcurrentApplyOption opt{2, 1, 2, 1}; + LOG_IF(FATAL, false == copysetNodeOptions.concurrentapply->Init(opt)) << "Failed to init concurrent apply module"; Configuration conf;