Skip to content

Commit

Permalink
[refactor] CurveBS: unify bs and fs apply queue
Browse files Browse the repository at this point in the history
Signed-off-by: Xinlong Chen <[email protected]>
  • Loading branch information
Xinlong-Chen committed Jul 27, 2023
1 parent c30eff0 commit d1c32af
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 39 deletions.
29 changes: 10 additions & 19 deletions src/chunkserver/concurrent_apply/concurrent_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ bool ConcurrentApplyModule::Init(const ConcurrentApplyOption &opt) {

start_ = true;
cond_.Reset(opt.rconcurrentsize + opt.wconcurrentsize);
InitThreadPool(ThreadPoolType::READ, rconcurrentsize_, rqueuedepth_);
InitThreadPool(ThreadPoolType::WRITE, wconcurrentsize_, wqueuedepth_);
InitThreadPool(ApplyTaskType::READ, rconcurrentsize_, rqueuedepth_);
InitThreadPool(ApplyTaskType::WRITE, wconcurrentsize_, wqueuedepth_);

if (!cond_.WaitFor(5000)) {
LOG(ERROR) << "init concurrent module's threads fail";
Expand Down Expand Up @@ -77,46 +77,46 @@ bool ConcurrentApplyModule::checkOptAndInit(


void ConcurrentApplyModule::InitThreadPool(
ThreadPoolType type, int concurrent, int depth) {
ApplyTaskType type, int concurrent, int depth) {
for (int i = 0; i < concurrent; i++) {
auto asyncth = new (std::nothrow) TaskThread(depth);
CHECK(asyncth != nullptr) << "allocate failed!";

switch (type) {
case ThreadPoolType::READ:
case ApplyTaskType::READ:
rapplyMap_.insert(std::make_pair(i, asyncth));
break;

case ThreadPoolType::WRITE:
case ApplyTaskType::WRITE:
wapplyMap_.insert(std::make_pair(i, asyncth));
break;
}
}

for (int i = 0; i < concurrent; i++) {
switch (type) {
case ThreadPoolType::READ:
case ApplyTaskType::READ:
rapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;

case ThreadPoolType::WRITE:
case ApplyTaskType::WRITE:
wapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;
}
}
}

void ConcurrentApplyModule::Run(ThreadPoolType type, int index) {
void ConcurrentApplyModule::Run(ApplyTaskType type, int index) {
cond_.Signal();
while (start_) {
switch (type) {
case ThreadPoolType::READ:
case ApplyTaskType::READ:
rapplyMap_[index]->tq.Pop()();
break;

case ThreadPoolType::WRITE:
case ApplyTaskType::WRITE:
wapplyMap_[index]->tq.Pop()();
break;
}
Expand Down Expand Up @@ -157,15 +157,6 @@ void ConcurrentApplyModule::Flush() {
event.Wait();
}

ThreadPoolType ConcurrentApplyModule::Schedule(CHUNK_OP_TYPE optype) {
switch (optype) {
case CHUNK_OP_READ:
case CHUNK_OP_RECOVER:
return ThreadPoolType::READ;
default:
return ThreadPoolType::WRITE;
}
}
} // namespace concurrent
} // namespace chunkserver
} // namespace curve
18 changes: 8 additions & 10 deletions src/chunkserver/concurrent_apply/concurrent_apply.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct ConcurrentApplyOption {
int rqueuedepth;
};

enum class ThreadPoolType {READ, WRITE};
enum class ApplyTaskType {READ, WRITE};

class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
public:
Expand All @@ -78,18 +78,18 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
/**
* Push: apply task will be push to ConcurrentApplyModule
* @param[in] key: used to hash task to specified queue
* @param[in] optype: operation type defined in proto
* @param[in] optype: read or write request type
* @param[in] f: task
* @param[in] args: param to excute task
*/
template <class F, class... Args>
bool Push(uint64_t key, CHUNK_OP_TYPE optype, F&& f, Args&&... args) {
switch (Schedule(optype)) {
case ThreadPoolType::READ:
bool Push(uint64_t key, ApplyTaskType optype, F&& f, Args&&... args) {
switch (optype) {
case ApplyTaskType::READ:
rapplyMap_[Hash(key, rconcurrentsize_)]->tq.Push(
std::forward<F>(f), std::forward<Args>(args)...);
break;
case ThreadPoolType::WRITE:
case ApplyTaskType::WRITE:
wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push(
std::forward<F>(f), std::forward<Args>(args)...);
break;
Expand All @@ -108,11 +108,9 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
private:
bool checkOptAndInit(const ConcurrentApplyOption &option);

void Run(ThreadPoolType type, int index);
void Run(ApplyTaskType type, int index);

static ThreadPoolType Schedule(CHUNK_OP_TYPE optype);

void InitThreadPool(ThreadPoolType type, int concorrent, int depth);
void InitThreadPool(ApplyTaskType type, int concorrent, int depth);

static int Hash(uint64_t key, int concurrent) {
return key % concurrent;
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
CHECK(nullptr != chunkClosure)
<< "ChunkClosure dynamic cast failed";
std::shared_ptr<ChunkOpRequest>& opRequest = chunkClosure->request_;
concurrentapply_->Push(opRequest->ChunkId(), opRequest->OpType(),
concurrentapply_->Push(opRequest->ChunkId(), ChunkOpRequest::Schedule(opRequest->OpType()), // NOLINT
&ChunkOpRequest::OnApply, opRequest,
iter.index(), doneGuard.release());
} else {
Expand All @@ -320,7 +320,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
auto opReq = ChunkOpRequest::Decode(log, &request, &data,
iter.index(), GetLeaderId());
auto chunkId = request.chunkid();
concurrentapply_->Push(chunkId, request.optype(),
concurrentapply_->Push(chunkId, ChunkOpRequest::Schedule(request.optype()), // NOLINT
&ChunkOpRequest::OnApplyFromLog, opReq,
dataStore_, std::move(request), data);
}
Expand Down
15 changes: 13 additions & 2 deletions src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ std::shared_ptr<ChunkOpRequest> ChunkOpRequest::Decode(butil::IOBuf log,
}
}

ApplyTaskType ChunkOpRequest::Schedule(CHUNK_OP_TYPE opType) {
switch (opType) {
case CHUNK_OP_READ:
case CHUNK_OP_RECOVER:
return ApplyTaskType::READ;
default:
return ApplyTaskType::WRITE;
}
}

namespace {
uint64_t MaxAppliedIndex(
const std::shared_ptr<curve::chunkserver::CopysetNode>& node,
Expand Down Expand Up @@ -269,8 +279,9 @@ void ReadChunkRequest::Process() {
thisPtr,
node_->GetAppliedIndex(),
doneGuard.release());
concurrentApplyModule_->Push(
request_->chunkid(), request_->optype(), task);
concurrentApplyModule_->Push(request_->chunkid(),
ChunkOpRequest::Schedule(request_->optype()), // NOLINT
task);
return;
}

Expand Down
3 changes: 3 additions & 0 deletions src/chunkserver/op_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

using ::google::protobuf::RpcController;
using ::curve::chunkserver::concurrent::ConcurrentApplyModule;
using ::curve::chunkserver::concurrent::ApplyTaskType;

namespace curve {
namespace chunkserver {
Expand Down Expand Up @@ -156,6 +157,8 @@ class ChunkOpRequest : public std::enable_shared_from_this<ChunkOpRequest> {
uint64_t index,
PeerId leaderId);

static ApplyTaskType Schedule(CHUNK_OP_TYPE opType);

protected:
/**
* 打包request为braft::task,propose给相应的复制组
Expand Down
12 changes: 6 additions & 6 deletions test/chunkserver/concurrent_apply/concurrent_apply_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

using curve::chunkserver::concurrent::ConcurrentApplyModule;
using curve::chunkserver::concurrent::ConcurrentApplyOption;
using curve::chunkserver::CHUNK_OP_TYPE;
using curve::chunkserver::concurrent::ApplyTaskType;

TEST(ConcurrentApplyModule, InitTest) {
ConcurrentApplyModule concurrentapply;
Expand Down Expand Up @@ -89,8 +89,8 @@ TEST(ConcurrentApplyModule, RunTest) {
ConcurrentApplyOption opt{1, 1, 1, 1};
ASSERT_TRUE(concurrentapply.Init(opt));

ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_READ, rtask));
ASSERT_TRUE(concurrentapply.Push(1, CHUNK_OP_TYPE::CHUNK_OP_WRITE, wtask));
ASSERT_TRUE(concurrentapply.Push(1, ApplyTaskType::READ , rtask));
ASSERT_TRUE(concurrentapply.Push(1, ApplyTaskType::WRITE, wtask));

std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_EQ(1, testr);
Expand All @@ -112,7 +112,7 @@ TEST(ConcurrentApplyModule, FlushTest) {
};

for (int i = 0; i < 5000; i++) {
concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task);
concurrentapply.Push(i, ApplyTaskType::WRITE, task);
}

ASSERT_LT(testnum, 5000);
Expand All @@ -136,8 +136,8 @@ TEST(ConcurrentApplyModule, ConcurrentTest) {
};
while (!stop.load()) {
for (int i = 0; i < 10; i++) {
concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_RECOVER, task);
concurrentapply.Push(i, CHUNK_OP_TYPE::CHUNK_OP_WRITE, task);
concurrentapply.Push(i, ApplyTaskType::READ, task);
concurrentapply.Push(i, ApplyTaskType::WRITE, task);
}
}
};
Expand Down

0 comments on commit d1c32af

Please sign in to comment.