Skip to content

Commit

Permalink
chunkserver: fix crc issues
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai authored and ilixiaocui committed Jul 9, 2021
1 parent ead087b commit dc58284
Show file tree
Hide file tree
Showing 20 changed files with 350 additions and 183 deletions.
2 changes: 2 additions & 0 deletions curvefs_python/BUILD_bak
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cc_library(
"//proto:nameserver2_cc_proto",
"//proto:common_cc_proto",
"//proto:topology_cc_proto",
"//proto:scan_cc_proto",
"//proto:chunkserver-cc-protos",
"//src/client:curve_client"
],
Expand All @@ -79,6 +80,7 @@ cc_library(
"-lnameserver2_proto",
"-ltopology_proto",
"-lcommon_proto",
"-lscan_proto",
"-lchunkserver-protos",
"-lprotobuf",
"-lprotobuf_lite",
Expand Down
4 changes: 2 additions & 2 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ proto_library(
"chunkserver.proto",
"curve_storage.proto",
]),
deps = [":common_proto"],
deps = [":common_proto",
":scan_proto"],
visibility = ["//visibility:public"],
)

Expand Down Expand Up @@ -104,4 +105,3 @@ proto_library(
name = "scan_proto",
srcs = ["scan.proto"],
)

1 change: 1 addition & 0 deletions proto/chunk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message ChunkRequest {
optional uint64 sendScanMapTimeoutMs = 14; // for scan chunk
optional uint32 sendScanMapRetryTimes= 15; // for scan chunk
optional uint64 sendScanMapRetryIntervalUs = 16; // for scan chunk
optional bool readMetaPage = 17; // for scan chunk
};

enum CHUNK_OP_STATUS {
Expand Down
3 changes: 3 additions & 0 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

syntax = "proto2";
import "proto/common.proto";
import "proto/scan.proto";
package curve.mds.heartbeat;

option cc_generic_services = true;
Expand All @@ -43,6 +44,8 @@ message CopySetInfo {
optional bool scaning = 8;
// timestamp for last success scan (seconds)
optional uint64 lastScanSec = 9;
// failed crc check scanmap
repeated chunkserver.ScanMap scanMap = 10;
};

message ConfigChangeInfo {
Expand Down
2 changes: 2 additions & 0 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ void ChunkServer::InitScanOptions(
&scanOptions->intervalSec));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_size_byte",
&scanOptions->scanSize));
LOG_IF(FATAL, !conf->GetUInt32Value("global.meta_page_size",
&scanOptions->chunkMetaPageSize));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_rpc_timeout_ms",
&scanOptions->timeoutMs));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.scan_rpc_retry_times",
Expand Down
4 changes: 4 additions & 0 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ uint64_t CopysetNode::GetLastScan() const {
return lastScanSec_;
}

std::vector<ScanMap>& CopysetNode::GetFailedScanMap() {
return failedScanMaps_;
}

std::string CopysetNode::GetCopysetDir() const {
return copysetDirPath_;
}
Expand Down
7 changes: 6 additions & 1 deletion src/chunkserver/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "proto/heartbeat.pb.h"
#include "proto/chunk.pb.h"
#include "proto/common.pb.h"
#include "proto/scan.pb.h"

namespace curve {
namespace chunkserver {
Expand Down Expand Up @@ -152,6 +153,8 @@ class CopysetNode : public braft::StateMachine,

virtual uint64_t GetLastScan() const;

virtual std::vector<ScanMap>& GetFailedScanMap();

/**
* 返回复制组数据目录
* @return
Expand Down Expand Up @@ -281,7 +284,7 @@ class CopysetNode : public braft::StateMachine,
* @param peers:返回的成员列表(输出参数)
* @return
*/
void ListPeers(std::vector<Peer>* peers);
virtual void ListPeers(std::vector<Peer>* peers);

/**
* @brief initialize raft node options corresponding to the copyset node
Expand Down Expand Up @@ -458,6 +461,8 @@ class CopysetNode : public braft::StateMachine,
bool scaning_;
// last scan time
uint64_t lastScanSec_;
// failed check scanmap
std::vector<ScanMap> failedScanMaps_;
};

} // namespace chunkserver
Expand Down
12 changes: 12 additions & 0 deletions src/chunkserver/datastore/chunkserver_chunkfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,18 @@ CSErrorCode CSChunkFile::Read(char * buf, off_t offset, size_t length) {
return CSErrorCode::Success;
}

CSErrorCode CSChunkFile::ReadMetaPage(char * buf) {
ReadLockGuard readGuard(rwLock_);
int rc = readMetaPage(buf);
if (rc < 0) {
LOG(ERROR) << "Read chunk meta page failed."
<< "ChunkID: " << chunkId_
<< ",chunk sn: " << metaPage_.sn;
return CSErrorCode::InternalError;
}
return CSErrorCode::Success;
}

CSErrorCode CSChunkFile::ReadSpecifiedChunk(SequenceNum sn,
char * buf,
off_t offset,
Expand Down
9 changes: 9 additions & 0 deletions src/chunkserver/datastore/chunkserver_chunkfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ class CSChunkFile {
* @return: return error code
*/
CSErrorCode Read(char * buf, off_t offset, size_t length);

/**
* Read chunk meta data
* There may be concurrency, add read lock
* @param buf: the data read
* @return: return error code
*/
CSErrorCode ReadMetaPage(char * buf);

/**
* Read the chunk of the specified Sequence
* There may be concurrency, add read lock
Expand Down
16 changes: 16 additions & 0 deletions src/chunkserver/datastore/chunkserver_datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ CSErrorCode CSDataStore::ReadChunk(ChunkID id,
return CSErrorCode::Success;
}

CSErrorCode CSDataStore::ReadChunkMetaPage(ChunkID id, SequenceNum sn,
char * buf) {
auto chunkFile = metaCache_.Get(id);
if (chunkFile == nullptr) {
return CSErrorCode::ChunkNotExistError;
}

CSErrorCode errorCode = chunkFile->ReadMetaPage(buf);
if (errorCode != CSErrorCode::Success) {
LOG(WARNING) << "Read chunk meta page failed."
<< "ChunkID = " << id;
return errorCode;
}
return CSErrorCode::Success;
}

CSErrorCode CSDataStore::ReadSnapshotChunk(ChunkID id,
SequenceNum sn,
char * buf,
Expand Down
13 changes: 13 additions & 0 deletions src/chunkserver/datastore/chunkserver_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ class CSDataStore {
char * buf,
off_t offset,
size_t length);

/**
* Read the metadata of the current chunk
* @param id: the chunk id to be read
* @param sn: used to record trace, not used in actual logic processing,
* indicating the sequence number of the current user file
* @param buf: the content of the data read
* @return: return error code
*/
virtual CSErrorCode ReadChunkMetaPage(ChunkID id,
SequenceNum sn,
char * buf);

/**
* Read the data of the specified sequence, it may read the current
* chunk file, or it may read the snapshot file
Expand Down
6 changes: 6 additions & 0 deletions src/chunkserver/heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ int Heartbeat::BuildCopysetInfo(curve::mds::heartbeat::CopySetInfo* info,
if (copyset->GetLastScan() > 0) {
info->set_lastscansec(copyset->GetLastScan());
}
auto failedScanMaps = copyset->GetFailedScanMap();
if (!failedScanMaps.empty()) {
for (auto &map : failedScanMaps) {
info->add_scanmap()->CopyFrom(map);
}
}

std::vector<Peer> peers;
copyset->ListPeers(&peers);
Expand Down
1 change: 1 addition & 0 deletions src/chunkserver/heartbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "src/common/concurrent/concurrent.h"
#include "src/chunkserver/scan_manager.h"
#include "proto/heartbeat.pb.h"
#include "proto/scan.pb.h"

using ::curve::common::Thread;

Expand Down
29 changes: 23 additions & 6 deletions src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -876,12 +876,20 @@ void ScanChunkRequest::OnApply(uint64_t index,
std::unique_ptr<char[]> readBuffer(new(std::nothrow)char[size]);
CHECK(nullptr != readBuffer)
<< "new readBuffer failed " << strerror(errno);
// scan chunk metapage or user data
auto ret = 0;
if (request_->has_readmetapage() && request_->readmetapage()) {
ret = datastore_->ReadChunkMetaPage(request_->chunkid(),
request_->sn(),
readBuffer.get());
} else {
ret = datastore_->ReadChunk(request_->chunkid(),
request_->sn(),
readBuffer.get(),
request_->offset(),
size);
}

auto ret = datastore_->ReadChunk(request_->chunkid(),
request_->sn(),
readBuffer.get(),
request_->offset(),
size);
if (CSErrorCode::Success == ret) {
crc = ::curve::common::CRC32(readBuffer.get(), size);
// build scanmap
Expand Down Expand Up @@ -924,11 +932,20 @@ void ScanChunkRequest::OnApplyFromLog(std::shared_ptr<CSDataStore> datastore, /
CHECK(nullptr != readBuffer)
<< "new readBuffer failed " << strerror(errno);

auto ret = datastore->ReadChunk(request.chunkid(),
// scan chunk metapage or user data
auto ret = 0;
if (request.has_readmetapage() && request.readmetapage()) {
ret = datastore->ReadChunkMetaPage(request.chunkid(),
request.sn(),
readBuffer.get());
} else {
ret = datastore->ReadChunk(request.chunkid(),
request.sn(),
readBuffer.get(),
request.offset(),
size);
}

if (CSErrorCode::Success == ret) {
crc = ::curve::common::CRC32(readBuffer.get(), size);
BuildAndSendScanMap(request, index_, crc);
Expand Down
Loading

0 comments on commit dc58284

Please sign in to comment.