diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 5c20cacbbd..03dd55beef 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -1011,7 +1011,8 @@ void ScanChunkRequest::BuildAndSendScanMap(const ChunkRequest &request, request.sendscanmapretrytimes(), request.sendscanmapretryintervalus(), cntl, channel); - LOG(INFO) << "Sending scanmap: " << scanMap->ShortDebugString() + LOG(INFO) << "logid = " << cntl->log_id() + << "Sending scanmap: " << scanMap->ShortDebugString() << " to leader: " << peer_.addr; stub.FollowScanMap(cntl, scanMapRequest, scanMapResponse, done); } diff --git a/src/chunkserver/scan_manager.cpp b/src/chunkserver/scan_manager.cpp index 593be381ad..b2a9aaf6b3 100644 --- a/src/chunkserver/scan_manager.cpp +++ b/src/chunkserver/scan_manager.cpp @@ -119,7 +119,6 @@ void ScanManager::StartScanJob(ScanKey key) { job->poolId = key.first; job->id = key.second; job->type = ScanType::Init; - job->currentChunkId = 0; job->isFinished = true; job->dataStore = nodePtr->GetDataStore(); nodePtr->SetScan(true); @@ -231,7 +230,6 @@ int ScanManager::ScanJobProcess(const std::shared_ptr job) { iter++; } else { // split scan chunk request - job->currentChunkId = iter->first; uint32_t currentOffset = 0; bool scanChunkMetaPage = true; while (currentOffset < chunkSize_) { @@ -246,10 +244,14 @@ int ScanManager::ScanJobProcess(const std::shared_ptr job) { job->task.localMap.Clear(); job->task.followerMap.clear(); job->task.waitingNum = replicaNum; - job->task.chunkId = job->currentChunkId; + job->task.chunkId = iter->first; job->task.offset = currentOffset; + if (scanChunkMetaPage) { + job->task.len = chunkMetaPageSize_; + } else { + job->task.len = scanSize_; + } job->taskLock.Unlock(); - job->currentOffset = currentOffset; job->isFinished = false; // construct scan task @@ -258,7 +260,7 @@ int ScanManager::ScanJobProcess(const std::shared_ptr job) { request->set_optype(CHUNK_OP_TYPE::CHUNK_OP_SCAN); request->set_logicpoolid(job->poolId); request->set_copysetid(job->id); - request->set_chunkid(job->currentChunkId); + request->set_chunkid(iter->first); request->set_offset(currentOffset); request->set_sendscanmaptimeoutms(timeoutMs_); request->set_sendscanmapretrytimes(retry_); @@ -300,11 +302,17 @@ void ScanManager::SetLocalScanMap(ScanKey key, ScanMap map) { << " copysetId = " << key.second; return; } - if (job->currentChunkId != map.chunkid() || - job->currentOffset != map.offset()) { + job->taskLock.RDLock(); + bool matched = job->task.chunkId == map.chunkid() && + job->task.offset == map.offset() && + job->task.len == map.len(); + job->taskLock.Unlock(); + + if (!matched) { LOG(WARNING) << "SetLocalScanMap failed, mismatch scanmap." - << " job->chunkid = " << job->currentChunkId - << " job->offset = " << job->currentOffset + << " scantask.chunkid = " << job->task.chunkId + << " scantask.offset = " << job->task.offset + << " scantask.len = " << job->task.len << "; scanmap: " << map.ShortDebugString(); return; } @@ -321,8 +329,21 @@ void ScanManager::DealFollowerScanMap(const FollowScanMapRequest &request, ScanKey key(scanMap.logicalpoolid(), scanMap.copysetid()); auto job = GetJob(key); - if (nullptr != job && job->currentChunkId == scanMap.chunkid() && - job->currentOffset == scanMap.offset()) { + if (nullptr == job) { + LOG(WARNING) << "DealFollowerScanMap failed, job not found." + << " logical poolId = " << key.first + << " copysetId = " << key.second; + response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST); + return; + } + + job->taskLock.RDLock(); + bool matched = job->task.chunkId == scanMap.chunkid() && + job->task.offset == scanMap.offset() && + job->task.len == scanMap.len(); + job->taskLock.Unlock(); + + if (matched) { job->taskLock.WRLock(); job->task.followerMap.emplace_back(scanMap); job->task.waitingNum--; @@ -331,15 +352,12 @@ void ScanManager::DealFollowerScanMap(const FollowScanMapRequest &request, GenScanJobs(key); response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); return; - } - if (nullptr == job) { - LOG(WARNING) << "DealFollowerScanMap failed, job not found." - << " logical poolId = " << key.first - << " copysetId = " << key.second; } else { + ReadLockGuard readLockGuard(job->taskLock); LOG(WARNING) << "DealFollowerScanMap failed, mismatch scanmap." - << " job->chunkid = " << job->currentChunkId - << " job->offset = " << job->currentOffset + << " scantask.chunkid = " << job->task.chunkId + << " scantask.offset = " << job->task.offset + << " scantask.len = " << job->task.len << "; scanmap: " << scanMap.ShortDebugString(); } response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST); @@ -381,7 +399,8 @@ void ScanManager::CompareMap(std::shared_ptr job) { << " logicalpoolId = " << job->poolId << " copysetId = " << job->id << " chunkId = " << job->task.chunkId - << " offset = " << job->task.offset; + << " offset = " << job->task.offset + << " len = " << job->task.len; } job->isFinished = true; } diff --git a/src/chunkserver/scan_manager.h b/src/chunkserver/scan_manager.h index dda5b85d55..9459310669 100644 --- a/src/chunkserver/scan_manager.h +++ b/src/chunkserver/scan_manager.h @@ -79,6 +79,7 @@ enum ScanType { struct ScanTask { ChunkID chunkId; uint64_t offset; + uint64_t len; uint8_t waitingNum; ScanMap localMap; std::vector followerMap; @@ -92,8 +93,6 @@ struct ScanJob { ScanTask task; bool isFinished; RWLock taskLock; - ChunkID currentChunkId; - uint64_t currentOffset; ChunkMap chunkMap; std::shared_ptr dataStore; ScanJob() : type(ScanType::Init) {} diff --git a/test/chunkserver/scan_manager_test.cpp b/test/chunkserver/scan_manager_test.cpp index 0b1c29d2fc..e739c61514 100644 --- a/test/chunkserver/scan_manager_test.cpp +++ b/test/chunkserver/scan_manager_test.cpp @@ -160,12 +160,14 @@ TEST_F(ScanManagerTest, CompareMapSuccessTest) { // make scan job std::shared_ptr job = std::make_shared(); + job->poolId = 1; job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 1; - job->currentOffset = 12582912; + job->task.chunkId = 1; + job->task.offset = 12582912; + job->task.len = 4194304; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); @@ -219,8 +221,9 @@ TEST_F(ScanManagerTest, CompareMapFailTest) { job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 1; - job->currentOffset = 12582912; + job->task.chunkId = 1; + job->task.offset = 12582912; + job->task.len = 4194304; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); @@ -251,6 +254,14 @@ TEST_F(ScanManagerTest, CompareMapFailTest) { scanMap1->set_crc(100); scanMap1->set_offset(12582912); scanMap1->set_len(4194304); + ScanMap *scanMap2 = new ScanMap(); + scanMap2->set_logicalpoolid(1); + scanMap2->set_copysetid(10000); + scanMap2->set_chunkid(1); + scanMap2->set_index(1); + scanMap2->set_crc(200); + scanMap2->set_offset(0); // mismatch offset + scanMap2->set_len(4194304); // set local scanmap job->type = ScanType::WaitMap; scanManager_->SetLocalScanMap(key, *localMap); @@ -265,6 +276,12 @@ TEST_F(ScanManagerTest, CompareMapFailTest) { ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST, response.retcode()); + // test uncorrect offset + request.set_allocated_scanmap(scanMap2); + scanManager_->DealFollowerScanMap(request, &response); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST, + response.retcode()); + // test uncorrect index request.set_allocated_scanmap(localMap); scanManager_->DealFollowerScanMap(request, &response); @@ -306,8 +323,9 @@ TEST_F(ScanManagerTest, MismatchedCRCTest) { job->id = 10000; job->type = ScanType::NewMap; job->chunkMap = chunkMap; - job->currentChunkId = 1; - job->currentOffset = 12582912; + job->task.chunkId = 1; + job->task.offset = 12582912; + job->task.len = 4194304; ASSERT_EQ(3, job->task.waitingNum); ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); @@ -378,7 +396,6 @@ TEST_F(ScanManagerTest, CancelScanJobTest) { job->poolId = 1; job->id = 10000; job->type = ScanType::NewMap; - job->currentChunkId = 1; ASSERT_EQ(0, scanManager_->GetJobNum()); scanManager_->SetJob(key, job); ASSERT_EQ(1, scanManager_->GetJobNum());