From d1c32af1d2c20ad61cbb17819479d15bcb43032d Mon Sep 17 00:00:00 2001 From: Xinlong Chen Date: Sat, 22 Jul 2023 05:26:54 +0000 Subject: [PATCH] [refactor] CurveBS: unify bs and fs apply queue Signed-off-by: Xinlong Chen --- .../concurrent_apply/concurrent_apply.cpp | 29 +++++++------------ .../concurrent_apply/concurrent_apply.h | 18 +++++------- src/chunkserver/copyset_node.cpp | 4 +-- src/chunkserver/op_request.cpp | 15 ++++++++-- src/chunkserver/op_request.h | 3 ++ .../concurrent_apply_test.cpp | 12 ++++---- 6 files changed, 42 insertions(+), 39 deletions(-) diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.cpp b/src/chunkserver/concurrent_apply/concurrent_apply.cpp index 1dfc54e926..dd7b219502 100644 --- a/src/chunkserver/concurrent_apply/concurrent_apply.cpp +++ b/src/chunkserver/concurrent_apply/concurrent_apply.cpp @@ -43,8 +43,8 @@ bool ConcurrentApplyModule::Init(const ConcurrentApplyOption &opt) { start_ = true; cond_.Reset(opt.rconcurrentsize + opt.wconcurrentsize); - InitThreadPool(ThreadPoolType::READ, rconcurrentsize_, rqueuedepth_); - InitThreadPool(ThreadPoolType::WRITE, wconcurrentsize_, wqueuedepth_); + InitThreadPool(ApplyTaskType::READ, rconcurrentsize_, rqueuedepth_); + InitThreadPool(ApplyTaskType::WRITE, wconcurrentsize_, wqueuedepth_); if (!cond_.WaitFor(5000)) { LOG(ERROR) << "init concurrent module's threads fail"; @@ -77,17 +77,17 @@ bool ConcurrentApplyModule::checkOptAndInit( void ConcurrentApplyModule::InitThreadPool( - ThreadPoolType type, int concurrent, int depth) { + ApplyTaskType type, int concurrent, int depth) { for (int i = 0; i < concurrent; i++) { auto asyncth = new (std::nothrow) TaskThread(depth); CHECK(asyncth != nullptr) << "allocate failed!"; switch (type) { - case ThreadPoolType::READ: + case ApplyTaskType::READ: rapplyMap_.insert(std::make_pair(i, asyncth)); break; - case ThreadPoolType::WRITE: + case ApplyTaskType::WRITE: wapplyMap_.insert(std::make_pair(i, asyncth)); break; } @@ -95,12 +95,12 @@ void ConcurrentApplyModule::InitThreadPool( for (int i = 0; i < concurrent; i++) { switch (type) { - case ThreadPoolType::READ: + case ApplyTaskType::READ: rapplyMap_[i]->th = std::thread(&ConcurrentApplyModule::Run, this, type, i); break; - case ThreadPoolType::WRITE: + case ApplyTaskType::WRITE: wapplyMap_[i]->th = std::thread(&ConcurrentApplyModule::Run, this, type, i); break; @@ -108,15 +108,15 @@ void ConcurrentApplyModule::InitThreadPool( } } -void ConcurrentApplyModule::Run(ThreadPoolType type, int index) { +void ConcurrentApplyModule::Run(ApplyTaskType type, int index) { cond_.Signal(); while (start_) { switch (type) { - case ThreadPoolType::READ: + case ApplyTaskType::READ: rapplyMap_[index]->tq.Pop()(); break; - case ThreadPoolType::WRITE: + case ApplyTaskType::WRITE: wapplyMap_[index]->tq.Pop()(); break; } @@ -157,15 +157,6 @@ void ConcurrentApplyModule::Flush() { event.Wait(); } -ThreadPoolType ConcurrentApplyModule::Schedule(CHUNK_OP_TYPE optype) { - switch (optype) { - case CHUNK_OP_READ: - case CHUNK_OP_RECOVER: - return ThreadPoolType::READ; - default: - return ThreadPoolType::WRITE; - } -} } // namespace concurrent } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.h b/src/chunkserver/concurrent_apply/concurrent_apply.h index af167c3e9a..f0d6d875eb 100644 --- a/src/chunkserver/concurrent_apply/concurrent_apply.h +++ b/src/chunkserver/concurrent_apply/concurrent_apply.h @@ -55,7 +55,7 @@ struct ConcurrentApplyOption { int rqueuedepth; }; -enum class ThreadPoolType {READ, WRITE}; +enum class ApplyTaskType {READ, WRITE}; class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { public: @@ -78,18 +78,18 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { /** * Push: apply task will be push to ConcurrentApplyModule * @param[in] key: used to hash task to specified queue - * @param[in] optype: operation type defined in proto + * @param[in] optype: read or write request type * @param[in] f: task * @param[in] args: param to excute task */ template - bool Push(uint64_t key, CHUNK_OP_TYPE optype, F&& f, Args&&... args) { - switch (Schedule(optype)) { - case ThreadPoolType::READ: + bool Push(uint64_t key, ApplyTaskType optype, F&& f, Args&&... args) { + switch (optype) { + case ApplyTaskType::READ: rapplyMap_[Hash(key, rconcurrentsize_)]->tq.Push( std::forward(f), std::forward(args)...); break; - case ThreadPoolType::WRITE: + case ApplyTaskType::WRITE: wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push( std::forward(f), std::forward(args)...); break; @@ -108,11 +108,9 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { private: bool checkOptAndInit(const ConcurrentApplyOption &option); - void Run(ThreadPoolType type, int index); + void Run(ApplyTaskType type, int index); - static ThreadPoolType Schedule(CHUNK_OP_TYPE optype); - - void InitThreadPool(ThreadPoolType type, int concorrent, int depth); + void InitThreadPool(ApplyTaskType type, int concorrent, int depth); static int Hash(uint64_t key, int concurrent) { return key % concurrent; diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index 901bcf00fb..a00f7aaf9a 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -303,7 +303,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { CHECK(nullptr != chunkClosure) << "ChunkClosure dynamic cast failed"; std::shared_ptr& opRequest = chunkClosure->request_; - concurrentapply_->Push(opRequest->ChunkId(), opRequest->OpType(), + concurrentapply_->Push(opRequest->ChunkId(), ChunkOpRequest::Schedule(opRequest->OpType()), // NOLINT &ChunkOpRequest::OnApply, opRequest, iter.index(), doneGuard.release()); } else { @@ -320,7 +320,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { auto opReq = ChunkOpRequest::Decode(log, &request, &data, iter.index(), GetLeaderId()); auto chunkId = request.chunkid(); - concurrentapply_->Push(chunkId, request.optype(), + concurrentapply_->Push(chunkId, ChunkOpRequest::Schedule(request.optype()), // NOLINT &ChunkOpRequest::OnApplyFromLog, opReq, dataStore_, std::move(request), data); } diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 2fba994e0d..817e65c79f 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -176,6 +176,16 @@ std::shared_ptr ChunkOpRequest::Decode(butil::IOBuf log, } } +ApplyTaskType ChunkOpRequest::Schedule(CHUNK_OP_TYPE opType) { + switch (opType) { + case CHUNK_OP_READ: + case CHUNK_OP_RECOVER: + return ApplyTaskType::READ; + default: + return ApplyTaskType::WRITE; + } +} + namespace { uint64_t MaxAppliedIndex( const std::shared_ptr& node, @@ -269,8 +279,9 @@ void ReadChunkRequest::Process() { thisPtr, node_->GetAppliedIndex(), doneGuard.release()); - concurrentApplyModule_->Push( - request_->chunkid(), request_->optype(), task); + concurrentApplyModule_->Push(request_->chunkid(), + ChunkOpRequest::Schedule(request_->optype()), // NOLINT + task); return; } diff --git a/src/chunkserver/op_request.h b/src/chunkserver/op_request.h index ba208dd5d8..c29484f79b 100755 --- a/src/chunkserver/op_request.h +++ b/src/chunkserver/op_request.h @@ -37,6 +37,7 @@ using ::google::protobuf::RpcController; using ::curve::chunkserver::concurrent::ConcurrentApplyModule; +using ::curve::chunkserver::concurrent::ApplyTaskType; namespace curve { namespace chunkserver { @@ -156,6 +157,8 @@ class ChunkOpRequest : public std::enable_shared_from_this { uint64_t index, PeerId leaderId); + static ApplyTaskType Schedule(CHUNK_OP_TYPE opType); + protected: /** * 打包request为braft::task,propose给相应的复制组 diff --git a/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp b/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp index d12b6b6391..f2f5de8228 100644 --- a/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp +++ b/test/chunkserver/concurrent_apply/concurrent_apply_test.cpp @@ -31,7 +31,7 @@ using curve::chunkserver::concurrent::ConcurrentApplyModule; using curve::chunkserver::concurrent::ConcurrentApplyOption; -using curve::chunkserver::CHUNK_OP_TYPE; +using curve::chunkserver::concurrent::ApplyTaskType; TEST(ConcurrentApplyModule, InitTest) { ConcurrentApplyModule concurrentapply; @@ -89,8 +89,8 @@ TEST(ConcurrentApplyModule, RunTest) { ConcurrentApplyOption opt{1, 1, 1, 1}; ASSERT_TRUE(concurrentapply.Init(opt)); - ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_READ, rtask)); - ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_WRITE, wtask)); + ASSERT_TRUE(concurrentapply.Push(1, ApplyTaskType::READ , rtask)); + ASSERT_TRUE(concurrentapply.Push(1, ApplyTaskType::WRITE, wtask)); std::this_thread::sleep_for(std::chrono::milliseconds(200)); ASSERT_EQ(1, testr); @@ -112,7 +112,7 @@ TEST(ConcurrentApplyModule, FlushTest) { }; for (int i = 0; i < 5000; i++) { - concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task); + concurrentapply.Push(i, ApplyTaskType::WRITE, task); } ASSERT_LT(testnum, 5000); @@ -136,8 +136,8 @@ TEST(ConcurrentApplyModule, ConcurrentTest) { }; while (!stop.load()) { for (int i = 0; i < 10; i++) { - concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_RECOVER, task); - concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task); + concurrentapply.Push(i, ApplyTaskType::READ, task); + concurrentapply.Push(i, ApplyTaskType::WRITE, task); } } };