From dc58284bb40305b4f3912cf7eec312a718d31553 Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 1 Jul 2021 15:28:03 +0800 Subject: [PATCH] chunkserver: fix crc issues --- curvefs_python/BUILD_bak | 2 + proto/BUILD | 4 +- proto/chunk.proto | 1 + proto/heartbeat.proto | 3 + src/chunkserver/chunkserver.cpp | 2 + src/chunkserver/copyset_node.cpp | 4 + src/chunkserver/copyset_node.h | 7 +- .../datastore/chunkserver_chunkfile.cpp | 12 ++ .../datastore/chunkserver_chunkfile.h | 9 + .../datastore/chunkserver_datastore.cpp | 16 ++ .../datastore/chunkserver_datastore.h | 13 ++ src/chunkserver/heartbeat.cpp | 6 + src/chunkserver/heartbeat.h | 1 + src/chunkserver/op_request.cpp | 29 ++- src/chunkserver/scan_manager.cpp | 184 +++++++++--------- src/chunkserver/scan_manager.h | 20 +- src/mds/topology/topology.cpp | 2 - .../datastore/datastore_mock_unittest.cpp | 55 ++++++ test/chunkserver/mock_copyset_node.h | 3 + test/chunkserver/scan_manager_test.cpp | 160 +++++++++------ 20 files changed, 350 insertions(+), 183 deletions(-) diff --git a/curvefs_python/BUILD_bak b/curvefs_python/BUILD_bak index 64f2aef8c3..3e2e997cb4 100644 --- a/curvefs_python/BUILD_bak +++ b/curvefs_python/BUILD_bak @@ -54,6 +54,7 @@ cc_library( "//proto:nameserver2_cc_proto", "//proto:common_cc_proto", "//proto:topology_cc_proto", + "//proto:scan_cc_proto", "//proto:chunkserver-cc-protos", "//src/client:curve_client" ], @@ -79,6 +80,7 @@ cc_library( "-lnameserver2_proto", "-ltopology_proto", "-lcommon_proto", + "-lscan_proto", "-lchunkserver-protos", "-lprotobuf", "-lprotobuf_lite", diff --git a/proto/BUILD b/proto/BUILD index c46980eb7f..a18a262a09 100644 --- a/proto/BUILD +++ b/proto/BUILD @@ -68,7 +68,8 @@ proto_library( "chunkserver.proto", "curve_storage.proto", ]), - deps = [":common_proto"], + deps = [":common_proto", + ":scan_proto"], visibility = ["//visibility:public"], ) @@ -104,4 +105,3 @@ proto_library( name = "scan_proto", srcs = ["scan.proto"], ) - diff --git a/proto/chunk.proto b/proto/chunk.proto index ad81a01946..5e463b7db5 100755 --- a/proto/chunk.proto +++ b/proto/chunk.proto @@ -65,6 +65,7 @@ message ChunkRequest { optional uint64 sendScanMapTimeoutMs = 14; // for scan chunk optional uint32 sendScanMapRetryTimes= 15; // for scan chunk optional uint64 sendScanMapRetryIntervalUs = 16; // for scan chunk + optional bool readMetaPage = 17; // for scan chunk }; enum CHUNK_OP_STATUS { diff --git a/proto/heartbeat.proto b/proto/heartbeat.proto index b8efa4ce0d..19c17b8cd6 100644 --- a/proto/heartbeat.proto +++ b/proto/heartbeat.proto @@ -22,6 +22,7 @@ syntax = "proto2"; import "proto/common.proto"; +import "proto/scan.proto"; package curve.mds.heartbeat; option cc_generic_services = true; @@ -43,6 +44,8 @@ message CopySetInfo { optional bool scaning = 8; // timestamp for last success scan (seconds) optional uint64 lastScanSec = 9; + // failed crc check scanmap + repeated chunkserver.ScanMap scanMap = 10; }; message ConfigChangeInfo { diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 8ff5d8059f..1249a79e5b 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -613,6 +613,8 @@ void ChunkServer::InitScanOptions( &scanOptions->intervalSec)); LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_size_byte", &scanOptions->scanSize)); + LOG_IF(FATAL, !conf->GetUInt32Value("global.meta_page_size", + &scanOptions->chunkMetaPageSize)); LOG_IF(FATAL, !conf->GetUInt64Value("copyset.scan_rpc_timeout_ms", &scanOptions->timeoutMs)); LOG_IF(FATAL, !conf->GetUInt32Value("copyset.scan_rpc_retry_times", diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index 90530ec07c..48b51ac4ac 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -518,6 +518,10 @@ uint64_t CopysetNode::GetLastScan() const { return lastScanSec_; } +std::vector& CopysetNode::GetFailedScanMap() { + return failedScanMaps_; +} + std::string CopysetNode::GetCopysetDir() const { return copysetDirPath_; } diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index f008f2fd37..768176632d 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -43,6 +43,7 @@ #include "proto/heartbeat.pb.h" #include "proto/chunk.pb.h" #include "proto/common.pb.h" +#include "proto/scan.pb.h" namespace curve { namespace chunkserver { @@ -152,6 +153,8 @@ class CopysetNode : public braft::StateMachine, virtual uint64_t GetLastScan() const; + virtual std::vector& GetFailedScanMap(); + /** * 返回复制组数据目录 * @return @@ -281,7 +284,7 @@ class CopysetNode : public braft::StateMachine, * @param peers:返回的成员列表(输出参数) * @return */ - void ListPeers(std::vector* peers); + virtual void ListPeers(std::vector* peers); /** * @brief initialize raft node options corresponding to the copyset node @@ -458,6 +461,8 @@ class CopysetNode : public braft::StateMachine, bool scaning_; // last scan time uint64_t lastScanSec_; + // failed check scanmap + std::vector failedScanMaps_; }; } // namespace chunkserver diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index 89568ffd9e..5287dbda08 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -484,6 +484,18 @@ CSErrorCode CSChunkFile::Read(char * buf, off_t offset, size_t length) { return CSErrorCode::Success; } +CSErrorCode CSChunkFile::ReadMetaPage(char * buf) { + ReadLockGuard readGuard(rwLock_); + int rc = readMetaPage(buf); + if (rc < 0) { + LOG(ERROR) << "Read chunk meta page failed." + << "ChunkID: " << chunkId_ + << ",chunk sn: " << metaPage_.sn; + return CSErrorCode::InternalError; + } + return CSErrorCode::Success; +} + CSErrorCode CSChunkFile::ReadSpecifiedChunk(SequenceNum sn, char * buf, off_t offset, diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.h b/src/chunkserver/datastore/chunkserver_chunkfile.h index e7469eea55..c9e4378ea5 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.h +++ b/src/chunkserver/datastore/chunkserver_chunkfile.h @@ -184,6 +184,15 @@ class CSChunkFile { * @return: return error code */ CSErrorCode Read(char * buf, off_t offset, size_t length); + + /** + * Read chunk meta data + * There may be concurrency, add read lock + * @param buf: the data read + * @return: return error code + */ + CSErrorCode ReadMetaPage(char * buf); + /** * Read the chunk of the specified Sequence * There may be concurrency, add read lock diff --git a/src/chunkserver/datastore/chunkserver_datastore.cpp b/src/chunkserver/datastore/chunkserver_datastore.cpp index b8dc22433f..f3df48c51d 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.cpp +++ b/src/chunkserver/datastore/chunkserver_datastore.cpp @@ -160,6 +160,22 @@ CSErrorCode CSDataStore::ReadChunk(ChunkID id, return CSErrorCode::Success; } +CSErrorCode CSDataStore::ReadChunkMetaPage(ChunkID id, SequenceNum sn, + char * buf) { + auto chunkFile = metaCache_.Get(id); + if (chunkFile == nullptr) { + return CSErrorCode::ChunkNotExistError; + } + + CSErrorCode errorCode = chunkFile->ReadMetaPage(buf); + if (errorCode != CSErrorCode::Success) { + LOG(WARNING) << "Read chunk meta page failed." + << "ChunkID = " << id; + return errorCode; + } + return CSErrorCode::Success; +} + CSErrorCode CSDataStore::ReadSnapshotChunk(ChunkID id, SequenceNum sn, char * buf, diff --git a/src/chunkserver/datastore/chunkserver_datastore.h b/src/chunkserver/datastore/chunkserver_datastore.h index f523df455e..885c444913 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.h +++ b/src/chunkserver/datastore/chunkserver_datastore.h @@ -187,6 +187,19 @@ class CSDataStore { char * buf, off_t offset, size_t length); + + /** + * Read the metadata of the current chunk + * @param id: the chunk id to be read + * @param sn: used to record trace, not used in actual logic processing, + * indicating the sequence number of the current user file + * @param buf: the content of the data read + * @return: return error code + */ + virtual CSErrorCode ReadChunkMetaPage(ChunkID id, + SequenceNum sn, + char * buf); + /** * Read the data of the specified sequence, it may read the current * chunk file, or it may read the snapshot file diff --git a/src/chunkserver/heartbeat.cpp b/src/chunkserver/heartbeat.cpp index 4f270e574e..1e2a7196f8 100644 --- a/src/chunkserver/heartbeat.cpp +++ b/src/chunkserver/heartbeat.cpp @@ -154,6 +154,12 @@ int Heartbeat::BuildCopysetInfo(curve::mds::heartbeat::CopySetInfo* info, if (copyset->GetLastScan() > 0) { info->set_lastscansec(copyset->GetLastScan()); } + auto failedScanMaps = copyset->GetFailedScanMap(); + if (!failedScanMaps.empty()) { + for (auto &map : failedScanMaps) { + info->add_scanmap()->CopyFrom(map); + } + } std::vector peers; copyset->ListPeers(&peers); diff --git a/src/chunkserver/heartbeat.h b/src/chunkserver/heartbeat.h index 9bc587b4cb..8bfe388086 100644 --- a/src/chunkserver/heartbeat.h +++ b/src/chunkserver/heartbeat.h @@ -40,6 +40,7 @@ #include "src/common/concurrent/concurrent.h" #include "src/chunkserver/scan_manager.h" #include "proto/heartbeat.pb.h" +#include "proto/scan.pb.h" using ::curve::common::Thread; diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index c581a323a4..5c20cacbbd 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -876,12 +876,20 @@ void ScanChunkRequest::OnApply(uint64_t index, std::unique_ptr readBuffer(new(std::nothrow)char[size]); CHECK(nullptr != readBuffer) << "new readBuffer failed " << strerror(errno); + // scan chunk metapage or user data + auto ret = 0; + if (request_->has_readmetapage() && request_->readmetapage()) { + ret = datastore_->ReadChunkMetaPage(request_->chunkid(), + request_->sn(), + readBuffer.get()); + } else { + ret = datastore_->ReadChunk(request_->chunkid(), + request_->sn(), + readBuffer.get(), + request_->offset(), + size); + } - auto ret = datastore_->ReadChunk(request_->chunkid(), - request_->sn(), - readBuffer.get(), - request_->offset(), - size); if (CSErrorCode::Success == ret) { crc = ::curve::common::CRC32(readBuffer.get(), size); // build scanmap @@ -924,11 +932,20 @@ void ScanChunkRequest::OnApplyFromLog(std::shared_ptr datastore, / CHECK(nullptr != readBuffer) << "new readBuffer failed " << strerror(errno); - auto ret = datastore->ReadChunk(request.chunkid(), + // scan chunk metapage or user data + auto ret = 0; + if (request.has_readmetapage() && request.readmetapage()) { + ret = datastore->ReadChunkMetaPage(request.chunkid(), + request.sn(), + readBuffer.get()); + } else { + ret = datastore->ReadChunk(request.chunkid(), request.sn(), readBuffer.get(), request.offset(), size); + } + if (CSErrorCode::Success == ret) { crc = ::curve::common::CRC32(readBuffer.get(), size); BuildAndSendScanMap(request, index_, crc); diff --git a/src/chunkserver/scan_manager.cpp b/src/chunkserver/scan_manager.cpp index 9de32de7bd..593be381ad 100644 --- a/src/chunkserver/scan_manager.cpp +++ b/src/chunkserver/scan_manager.cpp @@ -31,6 +31,7 @@ using ::google::protobuf::util::MessageDifferencer; int ScanManager::Init(const ScanManagerOptions &options) { toStop_.store(false, std::memory_order_release); scanSize_ = options.scanSize; + chunkMetaPageSize_ = options.chunkMetaPageSize; timeoutMs_ = options.timeoutMs; retry_ = options.retry; retryIntervalUs_ = options.retryIntervalUs; @@ -122,6 +123,7 @@ void ScanManager::StartScanJob(ScanKey key) { job->isFinished = true; job->dataStore = nodePtr->GetDataStore(); nodePtr->SetScan(true); + nodePtr->GetFailedScanMap().clear(); jobMapLock_.WRLock(); jobs_.emplace(key, job); jobMapLock_.Unlock(); @@ -144,6 +146,7 @@ int ScanManager::CancelScanJob(LogicPoolID poolId, CopysetID id) { if (nullptr != job) { auto nodePtr = copysetNodeManager_->GetCopysetNode(poolId, id); nodePtr->SetScan(false); + nodePtr->GetFailedScanMap().clear(); WriteLockGuard writeGuard(jobMapLock_); jobs_.erase(key); } @@ -158,34 +161,9 @@ bool ScanManager::GenScanJob(std::shared_ptr job) { job->type = ScanType::NewMap; break; case ScanType::NewMap: - if (job->chunkMap.empty()) { - LOG(WARNING) << "GenScanJob failed, job's chunkmap is empty" - << " logicalpoolId = " << job->poolId - << " copysetId = " << job->id; - ScanJobFinish(job); - } else { - job->iter = job->chunkMap.find(job->currentChunkId); - if (job->iter == job->chunkMap.end()) { - job->iter = job->chunkMap.begin(); - } - // check chunk version - do { - auto csChunkFile = job->iter->second; - if (csChunkFile->GetChunkFileMetaPage().version != - FORMAT_VERSION_V2) { - job->iter++; - } else { - break; - } - } while (job->iter != job->chunkMap.end()); - - if (job->iter == job->chunkMap.end()) { - ScanJobFinish(job); - done = true; - break; - } - job->currentChunkId = job->iter->first; - ScanChunkReqProcess(job); + if (0 == ScanJobProcess(job)) { + job->type = ScanType::Finish; + break; } done = true; break; @@ -201,15 +179,6 @@ bool ScanManager::GenScanJob(std::shared_ptr job) { break; case ScanType::CompareMap: CompareMap(job); - if (isCurrentJobFinish(job)) { - job->type = ScanType::Finish; - break; - } else if (isCurrentChunkFinish(job)) { - job->type = ScanType::NewMap; - (job->iter)++; - job->currentChunkId = job->iter->first; - break; - } done = true; break; case ScanType::Finish: @@ -239,47 +208,88 @@ void ScanManager::GenScanJobs(ScanKey key) { } // send scan request to braft -void ScanManager::ScanChunkReqProcess(const std::shared_ptr job) { - // split scan chunk request - uint32_t currentOffset = 0; +int ScanManager::ScanJobProcess(const std::shared_ptr job) { + // check chunkmap + if (job->chunkMap.empty()) { + LOG(WARNING) << "GenScanJob failed, job's chunkmap is empty" + << " logicalpoolId = " << job->poolId + << " copysetId = " << job->id; + return 0; + } + + // iterate chunkmap auto nodePtr = copysetNodeManager_->GetCopysetNode(job->poolId, job->id); - while (currentOffset < chunkSize_) { - // Init job - job->taskLock.WRLock(); - job->task.localMap.Clear(); - job->task.followerMap.clear(); - job->task.waitingNum = 3; - job->task.chunkId = job->currentChunkId; - job->task.offset = currentOffset; - job->taskLock.Unlock(); - job->currentOffset = currentOffset; - job->isFinished = false; + std::vector peers; + nodePtr->ListPeers(&peers); + auto replicaNum = peers.size(); + auto iter = job->chunkMap.begin(); + while (iter != job->chunkMap.end()) { + // check chunk version + auto csChunkFile = iter->second; + if (csChunkFile->GetChunkFileMetaPage().version != + FORMAT_VERSION_V2) { + iter++; + } else { + // split scan chunk request + job->currentChunkId = iter->first; + uint32_t currentOffset = 0; + bool scanChunkMetaPage = true; + while (currentOffset < chunkSize_) { + // check is leader, if not cancel the job + if (!nodePtr->IsLeaderTerm()) { + CancelScanJob(job->poolId, job->id); + return -1; + } - // construct scan task - ChunkResponse *response = new ChunkResponse(); - ChunkRequest *request = new ChunkRequest(); - request->set_optype(CHUNK_OP_TYPE::CHUNK_OP_SCAN); - request->set_logicpoolid(job->poolId); - request->set_copysetid(job->id); - request->set_chunkid(job->currentChunkId); - request->set_offset(currentOffset); - request->set_size(scanSize_); - request->set_sendscanmaptimeoutms(timeoutMs_); - request->set_sendscanmapretrytimes(retry_); - request->set_sendscanmapretryintervalus(retryIntervalUs_); - ScanChunkClosure *done = new ScanChunkClosure(request, response); - std::shared_ptr req = - std::make_shared(nodePtr, this, request, - response, done); - req->Process(); - currentOffset += scanSize_; - // wait for scan task finished - uint32_t retry = retry_; - while (!job->isFinished && retry > 0) { - scanTaskWaitInterval_.WaitForNextExcution(); - retry--; + // Init job + job->taskLock.WRLock(); + job->task.localMap.Clear(); + job->task.followerMap.clear(); + job->task.waitingNum = replicaNum; + job->task.chunkId = job->currentChunkId; + job->task.offset = currentOffset; + job->taskLock.Unlock(); + job->currentOffset = currentOffset; + job->isFinished = false; + + // construct scan task + ChunkResponse *response = new ChunkResponse(); + ChunkRequest *request = new ChunkRequest(); + request->set_optype(CHUNK_OP_TYPE::CHUNK_OP_SCAN); + request->set_logicpoolid(job->poolId); + request->set_copysetid(job->id); + request->set_chunkid(job->currentChunkId); + request->set_offset(currentOffset); + request->set_sendscanmaptimeoutms(timeoutMs_); + request->set_sendscanmapretrytimes(retry_); + request->set_sendscanmapretryintervalus(retryIntervalUs_); + if (scanChunkMetaPage) { + request->set_readmetapage(true); + request->set_size(chunkMetaPageSize_); + } else { + request->set_size(scanSize_); + } + ScanChunkClosure *done = new ScanChunkClosure(request, + response); + std::shared_ptr req = + std::make_shared(nodePtr, this, request, + response, done); + req->Process(); + if (!scanChunkMetaPage) { + currentOffset += scanSize_; + } + // wait for scan task finished + uint32_t retry = retry_; + while (!job->isFinished && retry > 0) { + scanTaskWaitInterval_.WaitForNextExcution(); + retry--; + } + scanChunkMetaPage = false; + } + iter++; } } + return 0; } void ScanManager::SetLocalScanMap(ScanKey key, ScanMap map) { @@ -302,6 +312,7 @@ void ScanManager::SetLocalScanMap(ScanKey key, ScanMap map) { WriteLockGuard writeLockGuard(job->taskLock); job->task.localMap = map; job->task.waitingNum--; + LOG(INFO) << "Leader scanmap is: " << job->task.localMap.ShortDebugString(); } void ScanManager::DealFollowerScanMap(const FollowScanMapRequest &request, @@ -361,6 +372,10 @@ void ScanManager::CompareMap(std::shared_ptr job) { << job->task.followerMap[0].ShortDebugString() << "; the second follower scanmap: " << job->task.followerMap[1].ShortDebugString(); + // set failed scanmap + auto nodePtr = copysetNodeManager_->GetCopysetNode(job->poolId, + job->id); + nodePtr->GetFailedScanMap().emplace_back(job->task.localMap); } else { LOG(INFO) << "Compare scanmap successfully on" << " logicalpoolId = " << job->poolId @@ -373,27 +388,6 @@ void ScanManager::CompareMap(std::shared_ptr job) { } } -bool ScanManager::isCurrentChunkFinish(std::shared_ptr job) { - if (nullptr != job) { - // last offset+len - uint64_t lastOffset = chunkSize_ - scanSize_; - if (job->currentOffset == lastOffset) { - return true; - } - } - return false; -} - -bool ScanManager::isCurrentJobFinish(std::shared_ptr job) { - if (nullptr != job) { - ChunkMap::iterator iter = job->iter; - if (++iter == job->chunkMap.end() && isCurrentChunkFinish(job)) { - return true; - } - } - return false; -} - void ScanManager::ScanJobFinish(std::shared_ptr job) { if (nullptr != job) { ScanKey key(job->poolId, job->id); diff --git a/src/chunkserver/scan_manager.h b/src/chunkserver/scan_manager.h index 54d024789c..dda5b85d55 100644 --- a/src/chunkserver/scan_manager.h +++ b/src/chunkserver/scan_manager.h @@ -57,6 +57,7 @@ struct ScanManagerOptions { uint32_t intervalSec; // once scan buf size uint64_t scanSize; + uint32_t chunkMetaPageSize; // use for follower send scanmap to leader uint64_t timeoutMs; uint32_t retry; @@ -91,7 +92,6 @@ struct ScanJob { ScanTask task; bool isFinished; RWLock taskLock; - ChunkMap::iterator iter; ChunkID currentChunkId; uint64_t currentOffset; ChunkMap chunkMap; @@ -205,22 +205,9 @@ class ScanManager { /** * @brief process chunk scan request send task to braft * @param[in] job: the scan job + * @return 0 is finished, -1 is canceled */ - void ScanChunkReqProcess(const std::shared_ptr job); - - /** - * @brief check whether current chunk scan finished - * @param[in] job: the scanJob - * @return true if finished, otherwise false - */ - bool isCurrentChunkFinish(std::shared_ptr job); - - /** - * @brief check whether job finished - * @param[in] job: the scanJob - * @return true if finished, otherwise false - */ - bool isCurrentJobFinish(std::shared_ptr job); + int ScanJobProcess(const std::shared_ptr job); /** * @brief set the scan job to finished status @@ -259,6 +246,7 @@ class ScanManager { RWLock jobMapLock_; CopysetNodeManager *copysetNodeManager_; uint32_t chunkSize_; + uint32_t chunkMetaPageSize_; uint64_t scanSize_; uint64_t timeoutMs_; uint32_t retry_; diff --git a/src/mds/topology/topology.cpp b/src/mds/topology/topology.cpp index fe2adcfc73..ada9d25613 100644 --- a/src/mds/topology/topology.cpp +++ b/src/mds/topology/topology.cpp @@ -1109,8 +1109,6 @@ int TopologyImpl::UpdateCopySetTopo(const CopySetInfo &data) { } it->second.SetDirtyFlag(true); - it->second.SetScaning(data.GetScaning()); - it->second.SetLastScanSec(data.GetLastScanSec()); return kTopoErrCodeSuccess; } else { LOG(WARNING) << "UpdateCopySetTopo can not find copyset, " diff --git a/test/chunkserver/datastore/datastore_mock_unittest.cpp b/test/chunkserver/datastore/datastore_mock_unittest.cpp index dc32f04073..cda70b63fa 100644 --- a/test/chunkserver/datastore/datastore_mock_unittest.cpp +++ b/test/chunkserver/datastore/datastore_mock_unittest.cpp @@ -2877,6 +2877,61 @@ TEST_F(CSDataStore_test, ReadSnapshotChunkErrorTest2) { .Times(1); } +/** + * ReadChunkMetaPageTest + * case: read normal chunk + * expect: read successfully + */ +TEST_F(CSDataStore_test, ReadChunkMetaDataTest1) { + // initialize + FakeEnv(); + EXPECT_TRUE(dataStore->Initialize()); + + ChunkID id = 3; + SequenceNum sn = 2; + char buf[PAGE_SIZE]; + memset(buf, 0, PAGE_SIZE); + // test chunk not exists + EXPECT_EQ(CSErrorCode::ChunkNotExistError, + dataStore->ReadChunkMetaPage(id, sn, buf)); + + EXPECT_CALL(*lfs_, Close(1)) + .Times(1); + EXPECT_CALL(*lfs_, Close(2)) + .Times(1); + EXPECT_CALL(*lfs_, Close(3)) + .Times(1); +} + +/** + * ReadChunkMetaPageTest + * case: read normal chunk + * expect: read successfully + */ +TEST_F(CSDataStore_test, ReadChunkMetaDataTest2) { + // initialize + FakeEnv(); + EXPECT_TRUE(dataStore->Initialize()); + + ChunkID id = 1; + SequenceNum sn = 2; + char buf[PAGE_SIZE]; + memset(buf, 0, PAGE_SIZE); + // test chunk exists + EXPECT_CALL(*lfs_, Read(1, NotNull(), 0, PAGE_SIZE)) + .Times(1); + EXPECT_EQ(CSErrorCode::Success, + dataStore->ReadChunkMetaPage(id, sn, buf)); + + EXPECT_CALL(*lfs_, Close(1)) + .Times(1); + EXPECT_CALL(*lfs_, Close(2)) + .Times(1); + EXPECT_CALL(*lfs_, Close(3)) + .Times(1); +} + + /** * DeleteChunkTest * case:chunk不存在 diff --git a/test/chunkserver/mock_copyset_node.h b/test/chunkserver/mock_copyset_node.h index b295d29b62..1e484fbfba 100644 --- a/test/chunkserver/mock_copyset_node.h +++ b/test/chunkserver/mock_copyset_node.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "src/chunkserver/copyset_node.h" @@ -43,6 +44,7 @@ class MockCopysetNode : public CopysetNode { MOCK_METHOD0(Fini, void()); MOCK_CONST_METHOD0(IsLeaderTerm, bool()); MOCK_CONST_METHOD0(GetLeaderId, PeerId()); + MOCK_METHOD1(ListPeers, void(std::vector*)); MOCK_CONST_METHOD0(GetConfEpoch, uint64_t()); MOCK_METHOD1(UpdateAppliedIndex, void(uint64_t)); MOCK_CONST_METHOD0(GetAppliedIndex, uint64_t()); @@ -52,6 +54,7 @@ class MockCopysetNode : public CopysetNode { MOCK_METHOD1(GetLeaderStatus, bool(NodeStatus*)); MOCK_CONST_METHOD0(GetDataStore, std::shared_ptr()); MOCK_CONST_METHOD0(GetConcurrentApplyModule, ConcurrentApplyModule*()); + MOCK_METHOD0(GetFailedScanMap, std::vector&()); MOCK_METHOD1(Propose, void(const braft::Task&)); MOCK_METHOD1(SetScan, void(bool)); MOCK_CONST_METHOD0(GetScan, bool()); diff --git a/test/chunkserver/scan_manager_test.cpp b/test/chunkserver/scan_manager_test.cpp index 9549751aec..0b1c29d2fc 100644 --- a/test/chunkserver/scan_manager_test.cpp +++ b/test/chunkserver/scan_manager_test.cpp @@ -20,6 +20,7 @@ * Author: wanghai01 */ +#include #include #include #include @@ -43,10 +44,12 @@ using ::testing::_; using ::testing::Return; using ::testing::ReturnRef; using ::testing::ReturnPointee; +using ::testing::SetArgPointee; using ::testing::Mock; using ::testing::Invoke; using curve::fs::LocalFsFactory; using ::curve::fs::FileSystemType; +using ::google::protobuf::util::MessageDifferencer; class ScanManagerTest : public ::testing::Test { protected: @@ -54,7 +57,7 @@ class ScanManagerTest : public ::testing::Test { copysetNodeManager_ = new MockCopysetNodeManager(); defaultOptions_.intervalSec = 5; defaultOptions_.scanSize = 4194304; - defaultOptions_.timeoutMs = 1; + defaultOptions_.timeoutMs = 100; defaultOptions_.retry = 1; defaultOptions_.retryIntervalUs = 100000; defaultOptions_.copysetNodeManager = copysetNodeManager_; @@ -107,18 +110,28 @@ TEST_F(ScanManagerTest, ScanJobTest) { ChunkMap chunkMap; chunkMap.emplace(1, csChunkFile_); + std::vector failedMap; + std::vector peers; + peers.push_back(Peer()); + peers.push_back(Peer()); + peers.push_back(Peer()); dataStore_ = std::make_shared(); copysetNode_ = std::make_shared(); EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) - .Times(2).WillRepeatedly(Return(copysetNode_)); + .Times(3).WillRepeatedly(Return(copysetNode_)); EXPECT_CALL(*copysetNode_, GetDataStore()) - .Times(5).WillRepeatedly(Return(dataStore_)); - EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); + .Times(6).WillRepeatedly(Return(dataStore_)); + EXPECT_CALL(*copysetNode_, GetFailedScanMap()) + .Times(1).WillOnce(ReturnRef(failedMap)); + EXPECT_CALL(*copysetNode_, SetScan(_)).Times(2); + EXPECT_CALL(*copysetNode_, SetLastScan(_)).Times(1); EXPECT_CALL(*dataStore_, GetChunkMap()) .Times(1).WillOnce(Return(chunkMap)); + EXPECT_CALL(*copysetNode_, ListPeers(_)).Times(1) + .WillOnce(SetArgPointee<0>(peers)); EXPECT_CALL(*copysetNode_, IsLeaderTerm()) - .Times(4).WillRepeatedly(Return(true)); - EXPECT_CALL(*copysetNode_, Propose(_)).Times(4). + .Times(10).WillRepeatedly(Return(true)); + EXPECT_CALL(*copysetNode_, Propose(_)).Times(5). WillRepeatedly(Invoke([](const braft::Task& task){ task.done->Run(); })); @@ -151,27 +164,13 @@ TEST_F(ScanManagerTest, CompareMapSuccessTest) { job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 0; + job->currentChunkId = 1; + job->currentOffset = 12582912; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); ASSERT_EQ(1, scanManager_->GetJobNum()); - // GenScanJobs on NewMap status - dataStore_ = std::make_shared(); - copysetNode_ = std::make_shared(); - EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) - .Times(1).WillOnce(Return(copysetNode_)); - EXPECT_CALL(*copysetNode_, GetDataStore()) - .Times(4).WillRepeatedly(Return(dataStore_)); - EXPECT_CALL(*copysetNode_, IsLeaderTerm()) - .Times(4).WillRepeatedly(Return(true)); - EXPECT_CALL(*copysetNode_, Propose(_)).Times(4). - WillRepeatedly(Invoke([](const braft::Task& task){ - task.done->Run(); - })); - scanManager_->GenScanJobs(key); - // GenScanJobs on WaitMap status ScanMap *localMap = new ScanMap(); localMap->set_logicalpoolid(1); @@ -194,11 +193,15 @@ TEST_F(ScanManagerTest, CompareMapSuccessTest) { ASSERT_EQ(1, job->task.waitingNum); ASSERT_EQ(1, job->task.followerMap.size()); // add second follower's scanmap + scanManager_->DealFollowerScanMap(request, &response); + // finish scan job + copysetNode_ = std::make_shared(); EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) .Times(1).WillOnce(Return(copysetNode_)); EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); EXPECT_CALL(*copysetNode_, SetLastScan(_)).Times(1); - scanManager_->DealFollowerScanMap(request, &response); + job->type = ScanType::Finish; + scanManager_->GenScanJobs(key); ASSERT_EQ(0, scanManager_->GetJobNum()); } @@ -216,27 +219,13 @@ TEST_F(ScanManagerTest, CompareMapFailTest) { job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 0; + job->currentChunkId = 1; + job->currentOffset = 12582912; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); ASSERT_EQ(1, scanManager_->GetJobNum()); - // GenScanJobs on NewMap status - dataStore_ = std::make_shared(); - copysetNode_ = std::make_shared(); - EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) - .Times(1).WillOnce(Return(copysetNode_)); - EXPECT_CALL(*copysetNode_, GetDataStore()) - .Times(4).WillRepeatedly(Return(dataStore_)); - EXPECT_CALL(*copysetNode_, IsLeaderTerm()) - .Times(4).WillRepeatedly(Return(true)); - EXPECT_CALL(*copysetNode_, Propose(_)).Times(4). - WillRepeatedly(Invoke([](const braft::Task& task){ - task.done->Run(); - })); - scanManager_->GenScanJobs(key); - // GenScanJobs on WaitMap status ScanMap *localMap = new ScanMap(); localMap->set_logicalpoolid(1); @@ -281,14 +270,25 @@ TEST_F(ScanManagerTest, CompareMapFailTest) { scanManager_->DealFollowerScanMap(request, &response); ASSERT_EQ(1, job->task.waitingNum); ASSERT_EQ(1, job->task.followerMap.size()); + request.set_allocated_scanmap(scanMap1); + std::vector failedMap; + copysetNode_ = std::make_shared(); + EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) + .Times(1).WillOnce(Return(copysetNode_)); + EXPECT_CALL(*copysetNode_, GetFailedScanMap()) + .Times(1).WillOnce(ReturnRef(failedMap)); + scanManager_->DealFollowerScanMap(request, &response); + ASSERT_EQ(1, failedMap.size()); + ASSERT_EQ(true, MessageDifferencer::Equals(job->task.localMap, + failedMap[0])); + // finish scan job EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) .Times(1).WillOnce(Return(copysetNode_)); EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); EXPECT_CALL(*copysetNode_, SetLastScan(_)).Times(1); - - request.set_allocated_scanmap(scanMap1); - scanManager_->DealFollowerScanMap(request, &response); + job->type = ScanType::Finish; + scanManager_->GenScanJobs(key); ASSERT_EQ(0, scanManager_->GetJobNum()); } @@ -306,27 +306,13 @@ TEST_F(ScanManagerTest, MismatchedCRCTest) { job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 0; + job->currentChunkId = 1; + job->currentOffset = 12582912; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); ASSERT_EQ(1, scanManager_->GetJobNum()); - // GenScanJobs on NewMap status - dataStore_ = std::make_shared(); - copysetNode_ = std::make_shared(); - EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) - .Times(1).WillOnce(Return(copysetNode_)); - EXPECT_CALL(*copysetNode_, GetDataStore()) - .Times(4).WillRepeatedly(Return(dataStore_)); - EXPECT_CALL(*copysetNode_, IsLeaderTerm()) - .Times(4).WillRepeatedly(Return(true)); - EXPECT_CALL(*copysetNode_, Propose(_)).Times(4). - WillRepeatedly(Invoke([](const braft::Task& task){ - task.done->Run(); - })); - scanManager_->GenScanJobs(key); - // GenScanJobs on WaitMap status ScanMap *localMap = new ScanMap(); localMap->set_logicalpoolid(1); @@ -356,13 +342,25 @@ TEST_F(ScanManagerTest, MismatchedCRCTest) { scanManager_->DealFollowerScanMap(request, &response); ASSERT_EQ(1, job->task.waitingNum); ASSERT_EQ(1, job->task.followerMap.size()); - // add second follower's scanmap + + std::vector failedMap; + copysetNode_ = std::make_shared(); EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) .Times(1).WillOnce(Return(copysetNode_)); - EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); - EXPECT_CALL(*copysetNode_, SetLastScan(_)).Times(1); + EXPECT_CALL(*copysetNode_, GetFailedScanMap()) + .Times(1).WillOnce(ReturnRef(failedMap)); request.set_allocated_scanmap(scanMap); scanManager_->DealFollowerScanMap(request, &response); + ASSERT_EQ(1, failedMap.size()); + ASSERT_EQ(true, MessageDifferencer::Equals(job->task.localMap, + failedMap[0])); + // finish scan job + EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) + .Times(1).WillOnce(Return(copysetNode_)); + EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); + EXPECT_CALL(*copysetNode_, SetLastScan(_)).Times(1); + job->type = ScanType::Finish; + scanManager_->GenScanJobs(key); ASSERT_EQ(0, scanManager_->GetJobNum()); } @@ -386,13 +384,53 @@ TEST_F(ScanManagerTest, CancelScanJobTest) { ASSERT_EQ(1, scanManager_->GetJobNum()); // test cancel scan job + std::vector failedMap; copysetNode_ = std::make_shared(); EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) .Times(1).WillOnce(Return(copysetNode_)); EXPECT_CALL(*copysetNode_, SetScan(_)).Times(1); + EXPECT_CALL(*copysetNode_, GetFailedScanMap()) + .Times(1).WillOnce(ReturnRef(failedMap)); scanManager_->CancelScanJob(1, 10000); ASSERT_EQ(0, scanManager_->GetJobNum()); } +TEST_F(ScanManagerTest, CancelScanJobAfterTransferLeaderTest) { + scanManager_->Enqueue(1, 10000); + ASSERT_EQ(1, scanManager_->GetWaitJobNum()); + ChunkMap chunkMap; + chunkMap.emplace(1, csChunkFile_); + + std::vector failedMap; + std::vector peers; + peers.push_back(Peer()); + peers.push_back(Peer()); + peers.push_back(Peer()); + dataStore_ = std::make_shared(); + copysetNode_ = std::make_shared(); + EXPECT_CALL(*copysetNodeManager_, GetCopysetNode(_, _)) + .Times(3).WillRepeatedly(Return(copysetNode_)); + EXPECT_CALL(*copysetNode_, GetDataStore()) + .Times(2).WillRepeatedly(Return(dataStore_)); + EXPECT_CALL(*copysetNode_, GetFailedScanMap()) + .Times(2).WillRepeatedly(ReturnRef(failedMap)); + EXPECT_CALL(*copysetNode_, SetScan(_)).Times(2); + EXPECT_CALL(*dataStore_, GetChunkMap()) + .Times(1).WillOnce(Return(chunkMap)); + EXPECT_CALL(*copysetNode_, ListPeers(_)).Times(1) + .WillOnce(SetArgPointee<0>(peers)); + EXPECT_CALL(*copysetNode_, IsLeaderTerm()) + .Times(3).WillOnce(Return(true)) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(*copysetNode_, Propose(_)).Times(1). + WillRepeatedly(Invoke([](const braft::Task& task){ + task.done->Run(); + })); + ASSERT_EQ(0, scanManager_->Run()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + scanManager_->Fini(); +} + } // namespace chunkserver } // namespace curve