Skip to content

Commit

Permalink
chunkserver: add crc
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai authored and ilixiaocui committed Jun 25, 2021
1 parent 3d2b757 commit 2ebe8da
Show file tree
Hide file tree
Showing 31 changed files with 1,432 additions and 185 deletions.
10 changes: 10 additions & 0 deletions conf/chunkserver.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ copyset.check_retrytimes=3
copyset.finishload_margin=2000
# 循环判定copyset是否加载完成的内部睡眠时间
copyset.check_loadmargin_interval_ms=1000
# scan copyset interval
copyset.scan_interval_sec=5
# the size each scan 4MB
copyset.scan_size_byte=4194304
# the follower send scanmap to leader rpc timeout
copyset.scan_rpc_timeout_ms=1000
# the follower send scanmap to leader rpc retry times
copyset.scan_rpc_retry_times=3
# the follower send scanmap to leader rpc retry interval
copyset.scan_rpc_retry_interval_us=100000

#
# Clone settings
Expand Down
5 changes: 5 additions & 0 deletions curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ chunkserver_copyset_load_concurrency: 10
chunkserver_copyset_check_retrytimes: 3
chunkserver_copyset_finishload_margin: 2000
chunkserver_copyset_check_loadmargin_interval_ms: 1000
chunkserver_copyset_scan_interval_sec: 5
chunkserver_copyset_scan_size_byte: 4194304
chunkserver_copyset_scan_rpc_timeout_ms: 1000
chunkserver_copyset_scan_rpc_retry_times: 3
chunkserver_copyset_scan_rpc_retry_interval_us: 100000
chunkserver_clone_slice_size: 1048576
chunkserver_clone_enable_paste: false
chunkserver_clone_thread_num: 10
Expand Down
10 changes: 10 additions & 0 deletions curve-ansible/roles/generate_config/templates/chunkserver.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ copyset.check_retrytimes={{ chunkserver_copyset_check_retrytimes }}
copyset.finishload_margin={{ chunkserver_copyset_finishload_margin }}
# 循环判定copyset是否加载完成的内部睡眠时间
copyset.check_loadmargin_interval_ms={{ chunkserver_copyset_check_loadmargin_interval_ms }}
# scan copyset interval
copyset.scan_interval_sec={{ chunkserver_copyset_scan_interval_sec }}
# the size each scan 4MB
copyset.scan_size_byte={{ chunkserver_copyset_scan_size_byte }}
# the follower send scanmap to leader rpc timeout
copyset.scan_rpc_timeout_ms={{ chunkserver_copyset_scan_rpc_timeout_ms }}
# the follower send scanmap to leader rpc retry times
copyset.scan_rpc_retry_times={{ chunkserver_copyset_scan_rpc_retry_times }}
# the follower send scanmap to leader rpc retry interval
copyset.scan_rpc_retry_interval_us={{ chunkserver_copyset_scan_rpc_retry_interval_us }}

#
# Clone settings
Expand Down
5 changes: 5 additions & 0 deletions deploy/local/chunkserver/conf/chunkserver.conf.0
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ copyset.load_concurrency=5
copyset.check_retrytimes=3
copyset.finishload_margin=2000
copyset.check_loadmargin_interval_ms=1000
copyset.scan_interval_sec=5
copyset.scan_size_byte=4194304
copyset.scan_rpc_timeout_ms=1000
copyset.scan_rpc_retry_times=3
copyset.scan_rpc_retry_interval_us=100000

#
# Clone settings
Expand Down
5 changes: 5 additions & 0 deletions deploy/local/chunkserver/conf/chunkserver.conf.1
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ copyset.load_concurrency=5
copyset.check_retrytimes=3
copyset.finishload_margin=2000
copyset.check_loadmargin_interval_ms=1000
copyset.scan_interval_sec=5
copyset.scan_size_byte=4194304
copyset.scan_rpc_timeout_ms=1000
copyset.scan_rpc_retry_times=3
copyset.scan_rpc_retry_interval_us=100000

#
# Clone settings
Expand Down
5 changes: 5 additions & 0 deletions deploy/local/chunkserver/conf/chunkserver.conf.2
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ copyset.load_concurrency=5
copyset.check_retrytimes=3
copyset.finishload_margin=2000
copyset.check_loadmargin_interval_ms=1000
copyset.scan_interval_sec=5
copyset.scan_size_byte=4194304
copyset.scan_rpc_timeout_ms=1000
copyset.scan_rpc_retry_times=3
copyset.scan_rpc_retry_interval_us=100000

#
# Clone settings
Expand Down
7 changes: 5 additions & 2 deletions proto/chunk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ enum CHUNK_OP_TYPE {
CHUNK_OP_CREATE_CLONE = 5; // 创建clone chunk
CHUNK_OP_RECOVER = 6; // 恢复clone chunk
CHUNK_OP_PASTE = 7; // paste chunk 内部请求
CHUNK_OP_SCAN = 8; // scan oprequest
CHUNK_OP_UNKNOWN = 9; // 未知 Op
CHUNK_OP_UNKNOWN = 8; // unknown Op
CHUNK_OP_SCAN = 9; // scan oprequest
};

// read/write 的实际数据在 rpc 的 attachment 中
Expand All @@ -62,6 +62,9 @@ message ChunkRequest {
optional string location = 11; // for CreateCloneChunk
optional string cloneFileSource = 12; // for write/read
optional uint64 cloneFileOffset = 13; // for write/read
optional uint64 sendScanMapTimeoutMs = 14; // for scan chunk
optional uint32 sendScanMapRetryTimes= 15; // for scan chunk
optional uint64 sendScanMapRetryIntervalUs = 16; // for scan chunk
};

enum CHUNK_OP_STATUS {
Expand Down
2 changes: 0 additions & 2 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ message CopySetInfo {
optional ConfigChangeInfo configChangeInfo = 6;
// copyset的性能信息
optional CopysetStatistics stats = 7;

// whether the current copyset is on scaning
optional bool scaning = 8;

// timestamp for last success scan (seconds)
optional uint64 lastScanSec = 9;
};
Expand Down
2 changes: 1 addition & 1 deletion proto/scan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package curve.chunkserver;
option cc_generic_services = true;

message ScanMap {
required uint32 logicPoolId = 1;
required uint32 logicalPoolId = 1;
required uint32 copysetId = 2;
required uint64 chunkId = 3;
required uint64 index = 4;
Expand Down
52 changes: 52 additions & 0 deletions src/chunkserver/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,57 @@ void ChunkClosure::Run() {
request_->RedirectChunkRequest();
}

void ScanChunkClosure::Run() {
// after run destory
std::unique_ptr<ScanChunkClosure> selfGuard(this);
std::unique_ptr<ChunkRequest> requestGuard(request_);
std::unique_ptr<ChunkResponse> responseGuard(response_);

switch (response_->status()) {
case CHUNK_OP_STATUS_CHUNK_NOTEXIST:
LOG(WARNING) << "scan chunk failed, read chunk not exist. "
<< request_->ShortDebugString();
break;
case CHUNK_OP_STATUS_FAILURE_UNKNOWN:
LOG(ERROR) << "scan chunk failed, read chunk unknown failure. "
<< request_->ShortDebugString();
break;
default:
break;
}
}

void SendScanMapClosure::Guard() {
std::unique_ptr<SendScanMapClosure> selfGuard(this);
std::unique_ptr<FollowScanMapRequest> requestGuard(request_);
std::unique_ptr<FollowScanMapResponse> responseGuard(response_);
std::unique_ptr<brpc::Controller> cntlGuard(cntl_);
std::unique_ptr<brpc::Channel> channelGuard(channel_);
}

void SendScanMapClosure::Run() {
if (cntl_->Failed()) {
if (retry_ > 0) {
LOG(WARNING) << "Send scanmap to leader rpc failed."
<< " cntl errorCode: " << cntl_->ErrorCode()
<< " cntl error: " << cntl_->ErrorText()
<< ", then will retry " << retry_ << " times.";
retry_--;
bthread_usleep(retryIntervalUs_);
cntl_->Reset();
cntl_->set_timeout_ms(rpcTimeoutMs_);
ScanService_Stub stub(channel_);
stub.FollowScanMap(cntl_, request_, response_, this);
} else {
LOG(ERROR) << " Send scanmap to leader rpc failed after retry,"
<< " cntl errorCode: " << cntl_->ErrorCode()
<< " cntl error: " << cntl_->ErrorText();
Guard();
}
} else {
Guard();
}
}

} // namespace chunkserver
} // namespace curve
46 changes: 46 additions & 0 deletions src/chunkserver/chunk_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <memory>

#include "src/chunkserver/op_request.h"
#include "proto/chunk.pb.h"

namespace curve {
namespace chunkserver {
Expand All @@ -52,6 +53,51 @@ class ChunkClosure : public braft::Closure {
std::shared_ptr<ChunkOpRequest> request_;
};

class ScanChunkClosure : public google::protobuf::Closure {
public:
ScanChunkClosure(ChunkRequest *request, ChunkResponse *response) :
request_(request), response_(response) {}

~ScanChunkClosure() = default;

void Run() override;

public:
ChunkRequest *request_;
ChunkResponse *response_;
};

class SendScanMapClosure : public google::protobuf::Closure {
public:
SendScanMapClosure(FollowScanMapRequest * request,
FollowScanMapResponse *response,
uint64_t timeout,
uint32_t retry,
uint64_t retryIntervalUs,
brpc::Controller* cntl,
brpc::Channel *channel) :
request_(request), response_(response),
rpcTimeoutMs_(timeout), retry_(retry),
retryIntervalUs_(retryIntervalUs),
cntl_(cntl), channel_(channel) {}

~SendScanMapClosure() = default;

void Run() override;

private:
void Guard();

public:
FollowScanMapRequest *request_;
FollowScanMapResponse *response_;
uint64_t rpcTimeoutMs_;
uint32_t retry_;
uint64_t retryIntervalUs_;
brpc::Controller *cntl_;
brpc::Channel *channel_;
};

} // namespace chunkserver
} // namespace curve

Expand Down
32 changes: 32 additions & 0 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,21 @@ int ChunkServer::Run(int argc, char** argv) {
LOG_IF(FATAL, copysetNodeManager_->Init(copysetNodeOptions) != 0)
<< "Failed to initialize CopysetNodeManager.";

// init scan model
ScanManagerOptions scanOpts;
InitScanOptions(&conf, &scanOpts);
scanOpts.copysetNodeManager = copysetNodeManager_;
LOG_IF(FATAL, scanManager_.Init(scanOpts) != 0)
<< "Failed to init scan manager.";

// 心跳模块初始化
HeartbeatOptions heartbeatOptions;
InitHeartbeatOptions(&conf, &heartbeatOptions);
heartbeatOptions.copysetNodeManager = copysetNodeManager_;
heartbeatOptions.fs = fs;
heartbeatOptions.chunkserverId = metadata.id();
heartbeatOptions.chunkserverToken = metadata.token();
heartbeatOptions.scanManager = &scanManager_;
LOG_IF(FATAL, heartbeat_.Init(heartbeatOptions) != 0)
<< "Failed to init Heartbeat manager.";

Expand Down Expand Up @@ -340,6 +348,12 @@ int ChunkServer::Run(int argc, char** argv) {
brpc::SERVER_DOESNT_OWN_SERVICE);
CHECK(0 == ret) << "Fail to add ChunkServerService";

// scan copyset service
ScanServiceImpl scanCopysetService(&scanManager_);
ret = server.AddService(&scanCopysetService,
brpc::SERVER_DOESNT_OWN_SERVICE);
CHECK(0 == ret) << "Fail to add ScanCopysetService";

// 启动rpc service
LOG(INFO) << "Internal server is going to serve on: "
<< copysetNodeOptions.ip << ":" << copysetNodeOptions.port;
Expand Down Expand Up @@ -390,6 +404,8 @@ int ChunkServer::Run(int argc, char** argv) {
<< "Failed to start heartbeat manager.";
LOG_IF(FATAL, copysetNodeManager_->Run() != 0)
<< "Failed to start CopysetNodeManager.";
LOG_IF(FATAL, scanManager_.Run() != 0)
<< "Failed to start scan manager.";
LOG_IF(FATAL, !chunkfilePool->StartCleaning())
<< "Failed to start file pool clean worker.";

Expand All @@ -405,6 +421,8 @@ int ChunkServer::Run(int argc, char** argv) {
server.Join();

LOG(INFO) << "ChunkServer is going to quit.";
LOG_IF(ERROR, scanManager_.Fini() != 0)
<< "Failed to shutdown scan manager.";
LOG_IF(ERROR, heartbeat_.Fini() != 0)
<< "Failed to shutdown heartbeat manager.";
LOG_IF(ERROR, copysetNodeManager_->Fini() != 0)
Expand Down Expand Up @@ -589,6 +607,20 @@ void ChunkServer::InitCloneOptions(
&cloneOptions->queueCapacity));
}

void ChunkServer::InitScanOptions(
common::Configuration *conf, ScanManagerOptions *scanOptions) {
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.scan_interval_sec",
&scanOptions->intervalSec));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_size_byte",
&scanOptions->scanSize));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_rpc_timeout_ms",
&scanOptions->timeoutMs));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.scan_rpc_retry_times",
&scanOptions->retry));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_rpc_retry_interval_us",
&scanOptions->retryIntervalUs));
}

void ChunkServer::InitHeartbeatOptions(
common::Configuration *conf, HeartbeatOptions *heartbeatOptions) {
LOG_IF(FATAL, !conf->GetStringValue("chunkserver.stor_uri",
Expand Down
8 changes: 8 additions & 0 deletions src/chunkserver/chunkserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include "src/common/configuration.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/heartbeat.h"
#include "src/chunkserver/scan_manager.h"
#include "src/chunkserver/clone_manager.h"
#include "src/chunkserver/register.h"
#include "src/chunkserver/trash.h"
#include "src/chunkserver/chunkserver_metrics.h"
#include "src/chunkserver/concurrent_apply/concurrent_apply.h"
#include "src/chunkserver/scan_service.h"

using ::curve::chunkserver::concurrent::ConcurrentApplyOption;

Expand Down Expand Up @@ -74,6 +76,9 @@ class ChunkServer {
void InitCloneOptions(common::Configuration *conf,
CloneOptions *cloneOptions);

void InitScanOptions(common::Configuration *conf,
ScanManagerOptions *scanOptions);

void InitHeartbeatOptions(common::Configuration *conf,
HeartbeatOptions *heartbeatOptions);

Expand Down Expand Up @@ -103,6 +108,9 @@ class ChunkServer {
// cloneManager_ 管理克隆任务
CloneManager cloneManager_;

// scan copyset manager
ScanManager scanManager_;

// heartbeat_ 负责向mds定期发送心跳,并下发心跳中任务
Heartbeat heartbeat_;

Expand Down
18 changes: 18 additions & 0 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ CopysetNode::CopysetNode(const LogicPoolID &logicPoolId,
chunkDataRpath_(),
appliedIndex_(0),
leaderTerm_(-1),
scaning_(false),
lastScanSec_(0),
configChange_(std::make_shared<ConfigurationChange>()) {
}

Expand Down Expand Up @@ -500,6 +502,22 @@ CopysetID CopysetNode::GetCopysetId() const {
return copysetId_;
}

void CopysetNode::SetScan(bool scan) {
scaning_ = scan;
}

bool CopysetNode::GetScan() const {
return scaning_;
}

void CopysetNode::SetLastScan(uint64_t time) {
lastScanSec_ = time;
}

uint64_t CopysetNode::GetLastScan() const {
return lastScanSec_;
}

std::string CopysetNode::GetCopysetDir() const {
return copysetDirPath_;
}
Expand Down
Loading

0 comments on commit 2ebe8da

Please sign in to comment.