diff --git a/conf/chunkserver.conf b/conf/chunkserver.conf index 19457b3c18..a96ff84f60 100644 --- a/conf/chunkserver.conf +++ b/conf/chunkserver.conf @@ -128,6 +128,8 @@ copyset.sync_chunk_limits=2097152 copyset.sync_threshold=65536 # check syncing interval copyset.check_syncing_interval_ms=500 +# wait for retry time when disk space is insufficient +copyset.wait_for_disk_freed_interval_ms=60000 # # Clone settings @@ -215,6 +217,11 @@ chunkfilepool.allocate_percent=80 chunkfilepool.chunk_file_pool_size=1GB # The thread num for format chunks chunkfilepool.thread_num=1 +# When the chunkserver disk usage exceeds the percentage, heartbeat sets the disk status +chunkfilepool.disk_usage_percent_limit=95 +# Reserve part of the chunk number, and the write operation returns readonly to the client +# when the available value is too small to avoid chunkfilepool and walfilepool not being able to obtain the chunk. +chunkfilepool.chunk_reserved=100 # # WAL file pool diff --git a/conf/chunkserver.conf.example b/conf/chunkserver.conf.example index 443412215b..2ac2a19009 100644 --- a/conf/chunkserver.conf.example +++ b/conf/chunkserver.conf.example @@ -120,6 +120,8 @@ copyset.sync_chunk_limits=2097152 copyset.sync_threshold=65536 # check syncing interval copyset.check_syncing_interval_ms=500 +# wait for retry time when disk space is insufficient +copyset.wait_for_disk_freed_interval_ms=60000 # # Clone settings @@ -207,6 +209,11 @@ chunkfilepool.allocate_percent=80 chunkfilepool.chunk_file_pool_size=1GB # The thread num for format chunks chunkfilepool.thread_num=1 +# When the chunkserver disk usage exceeds the percentage, heartbeat sets the disk status +chunkfilepool.disk_usage_percent_limit=95 +# Reserve part of the chunk number, and the write operation returns readonly to the client +# when the available value is too small to avoid chunkfilepool and walfilepool not being able to obtain the chunk. +chunkfilepool.chunk_reserved=0 # # WAL file pool diff --git a/proto/chunk.proto b/proto/chunk.proto index af5cd3fb5a..32ef96e3a2 100755 --- a/proto/chunk.proto +++ b/proto/chunk.proto @@ -85,6 +85,7 @@ enum CHUNK_OP_STATUS { CHUNK_OP_STATUS_BACKWARD = 10; // 请求的版本落后当前chunk的版本 CHUNK_OP_STATUS_CHUNK_EXIST = 11; // chunk已存在 CHUNK_OP_STATUS_EPOCH_TOO_OLD = 12; // request epoch too old + CHUNK_OP_STATUS_READONLY = 13; // If there is insufficient disk space, set the chunkserver to read-only }; message ChunkResponse { diff --git a/proto/heartbeat.proto b/proto/heartbeat.proto index 6b51d40277..77adadb293 100644 --- a/proto/heartbeat.proto +++ b/proto/heartbeat.proto @@ -71,8 +71,13 @@ message CopysetStatistics { required uint32 writeIOPS = 4; } +enum ErrorType { + NORMAL = 0; + DISKFULL = 1; +} + message DiskState { - required uint32 errType = 1; + required ErrorType errType = 1; required string errMsg = 2; } diff --git a/proto/topology.proto b/proto/topology.proto index 6e88d4e102..c1ef9d97e5 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -48,6 +48,7 @@ enum ChunkServerStatus { enum DiskState { DISKNORMAL = 0; DISKERROR = 1; + DISKFULL = 2; } enum OnlineState { diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 22f302c9da..d97bbf30cd 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -503,6 +503,9 @@ void ChunkServer::InitChunkFilePoolOptions( LOG_IF(FATAL, !conf->GetBoolValue( "chunkfilepool.enable_get_chunk_from_pool", &chunkFilePoolOptions->getFileFromPool)); + LOG_IF(FATAL, !conf->GetUInt32Value( + "chunkfilepool.chunk_reserved", + &chunkFilePoolOptions->chunkReserved)); if (chunkFilePoolOptions->getFileFromPool == false) { std::string chunkFilePoolUri; @@ -710,6 +713,9 @@ void ChunkServer::InitCopysetNodeOptions( LOG_IF(FATAL, !conf->GetUInt32Value("copyset.sync_trigger_seconds", ©setNodeOptions->syncTriggerSeconds)); } + LOG_IF(FATAL, !conf->GetUInt32Value( + "copyset.wait_for_disk_freed_interval_ms", + ©setNodeOptions->waitForDiskFreedIntervalMs)); } void ChunkServer::InitCopyerOptions( @@ -781,6 +787,9 @@ void ChunkServer::InitHeartbeatOptions( &heartbeatOptions->intervalSec)); LOG_IF(FATAL, !conf->GetUInt32Value("mds.heartbeat_timeout", &heartbeatOptions->timeout)); + LOG_IF(FATAL, !conf->GetUInt32Value( + "chunkfilepool.disk_usage_percent_limit", + &heartbeatOptions->chunkserverDiskLimit)); } void ChunkServer::InitRegisterOptions( diff --git a/src/chunkserver/config_info.h b/src/chunkserver/config_info.h index 67c3f57524..802ec1c471 100644 --- a/src/chunkserver/config_info.h +++ b/src/chunkserver/config_info.h @@ -140,6 +140,8 @@ struct CopysetNodeOptions { uint64_t syncThreshold = 64 * 1024; // check syncing interval uint32_t checkSyncingIntervalMs = 500u; + // wait for retry time when disk space is insufficient + uint32_t waitForDiskFreedIntervalMs = 60000; CopysetNodeOptions(); }; diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index a00f7aaf9a..9c73315c60 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -135,6 +135,8 @@ int CopysetNode::Init(const CopysetNodeOptions &options) { dsOptions.locationLimit = options.locationLimit; dsOptions.enableOdsyncWhenOpenChunkFile = options.enableOdsyncWhenOpenChunkFile; + dsOptions.waitForDiskFreedIntervalMs = + options.waitForDiskFreedIntervalMs; dataStore_ = std::make_shared(options.localFileSystem, options.chunkFilePool, dsOptions); @@ -345,6 +347,10 @@ void CopysetNode::WaitSnapshotDone() { } } +bool CopysetNode::ReadOnly() const { + return !dataStore_->EnoughChunk(); +} + void CopysetNode::save_snapshot_background(::braft::SnapshotWriter *writer, ::braft::Closure *done) { brpc::ClosureGuard doneGuard(done); @@ -529,7 +535,9 @@ void CopysetNode::on_leader_start(int64_t term) { * https://github.com/opencurve/curve/pull/2448 */ ChunkServerMetric::GetInstance()->IncreaseLeaderCount(); - concurrentapply_->Flush(); + if (concurrentapply_ != nullptr) { + concurrentapply_->Flush(); + } leaderTerm_.store(term, std::memory_order_release); LOG(INFO) << "Copyset: " << GroupIdString() << ", peer id: " << peerId_.to_string() diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index cf7a34aeec..a126388706 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -469,6 +469,8 @@ class CopysetNode : public braft::StateMachine, void WaitSnapshotDone(); + bool ReadOnly() const; + private: inline std::string GroupId() { return ToGroupId(logicPoolId_, copysetId_); diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index a6e6d3e0cc..573cffc12a 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -19,6 +19,7 @@ * File Created: Thursday, 6th September 2018 10:49:53 am * Author: yangyaokai */ +#include #include #include #include @@ -207,7 +208,8 @@ CSErrorCode CSChunkFile::Open(bool createFile) { if (rc != 0 && rc != -EEXIST) { LOG(ERROR) << "Error occured when create file." << " filepath = " << chunkFilePath; - return CSErrorCode::InternalError; + return rc == -ENOSPC ? CSErrorCode::NoSpaceError : + CSErrorCode::InternalError; } } int rc = -1; @@ -400,7 +402,8 @@ CSErrorCode CSChunkFile::Write(SequenceNum sn, << "ChunkID: " << chunkId_ << ",request sn: " << sn << ",chunk sn: " << metaPage_.sn; - return CSErrorCode::InternalError; + return rc == -ENOSPC ? CSErrorCode::NoSpaceError : + CSErrorCode::InternalError; } // If it is a clone chunk, the bitmap will be updated CSErrorCode errorCode = flush(); @@ -478,7 +481,8 @@ CSErrorCode CSChunkFile::Paste(const char * buf, off_t offset, size_t length) { << "ChunkID: " << chunkId_ << ", offset: " << offset << ", length: " << length; - return CSErrorCode::InternalError; + return rc == -ENOSPC ? CSErrorCode::NoSpaceError : + CSErrorCode::InternalError; } } diff --git a/src/chunkserver/datastore/chunkserver_datastore.cpp b/src/chunkserver/datastore/chunkserver_datastore.cpp index 19875882cf..d80ad03d4e 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.cpp +++ b/src/chunkserver/datastore/chunkserver_datastore.cpp @@ -44,7 +44,8 @@ CSDataStore::CSDataStore(std::shared_ptr lfs, baseDir_(options.baseDir), chunkFilePool_(chunkFilePool), lfs_(lfs), - enableOdsyncWhenOpenChunkFile_(options.enableOdsyncWhenOpenChunkFile) { + enableOdsyncWhenOpenChunkFile_(options.enableOdsyncWhenOpenChunkFile), + waitForDiskFreedIntervalMs_(options.waitForDiskFreedIntervalMs) { CHECK(!baseDir_.empty()) << "Create datastore failed"; CHECK(lfs_ != nullptr) << "Create datastore failed"; CHECK(chunkFilePool_ != nullptr) << "Create datastore failed"; @@ -428,5 +429,9 @@ ChunkMap CSDataStore::GetChunkMap() { return metaCache_.GetMap(); } +bool CSDataStore::EnoughChunk() { + return chunkFilePool_->EnoughChunk(); +} + } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/datastore/chunkserver_datastore.h b/src/chunkserver/datastore/chunkserver_datastore.h index 291403ab9b..fe4544f49a 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.h +++ b/src/chunkserver/datastore/chunkserver_datastore.h @@ -63,6 +63,7 @@ struct DataStoreOptions { PageSizeType metaPageSize; uint32_t locationLimit; bool enableOdsyncWhenOpenChunkFile; + uint32_t waitForDiskFreedIntervalMs; }; /** @@ -328,6 +329,8 @@ class CSDataStore { virtual ChunkMap GetChunkMap(); + virtual bool EnoughChunk(); + void SetCacheCondPtr(std::shared_ptr cond) { metaCache_.SetCondPtr(cond); } @@ -336,6 +339,10 @@ class CSDataStore { metaCache_.SetSyncChunkLimits(limit, threshold); } + void WaitForDiskFreed() { + bthread_usleep(waitForDiskFreedIntervalMs_); + } + private: CSErrorCode loadChunkFile(ChunkID id); CSErrorCode CreateChunkFile(const ChunkOptions & ops, @@ -362,6 +369,8 @@ class CSDataStore { DataStoreMetricPtr metric_; // enable O_DSYNC When Open ChunkFile bool enableOdsyncWhenOpenChunkFile_; + // wait for retry time when disk space is insufficient + uint32_t waitForDiskFreedIntervalMs_; }; } // namespace chunkserver diff --git a/src/chunkserver/datastore/chunkserver_snapshot.cpp b/src/chunkserver/datastore/chunkserver_snapshot.cpp index f1e398e8c6..9b8e459716 100644 --- a/src/chunkserver/datastore/chunkserver_snapshot.cpp +++ b/src/chunkserver/datastore/chunkserver_snapshot.cpp @@ -20,6 +20,7 @@ * Author: yangyaokai */ +#include #include #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/datastore/chunkserver_snapshot.h" @@ -154,7 +155,8 @@ CSErrorCode CSSnapshot::Open(bool createFile) { if (ret != 0) { LOG(ERROR) << "Error occured when create snapshot." << " filepath = " << snapshotPath; - return CSErrorCode::InternalError; + return ret == -ENOSPC ? CSErrorCode::NoSpaceError : + CSErrorCode::InternalError; } } int rc = lfs_->Open(snapshotPath, O_RDWR|O_NOATIME|O_DSYNC); @@ -216,7 +218,8 @@ CSErrorCode CSSnapshot::Write(const char * buf, off_t offset, size_t length) { LOG(ERROR) << "Write snapshot failed." << "ChunkID: " << chunkId_ << ",snapshot sn: " << metaPage_.sn; - return CSErrorCode::InternalError; + return rc == -ENOSPC ? CSErrorCode::NoSpaceError : + CSErrorCode::InternalError; } uint32_t pageBeginIndex = offset / blockSize_; uint32_t pageEndIndex = (offset + length - 1) / blockSize_; diff --git a/src/chunkserver/datastore/define.h b/src/chunkserver/datastore/define.h index 41c25677cf..1d0dda13fc 100644 --- a/src/chunkserver/datastore/define.h +++ b/src/chunkserver/datastore/define.h @@ -73,6 +73,8 @@ enum CSErrorCode { // The page has not been written, it will appear when the page that has not // been written is read when the clone chunk is read PageNerverWrittenError = 13, + // ENOSPC error + NoSpaceError = 14, }; // Chunk details diff --git a/src/chunkserver/datastore/file_pool.cpp b/src/chunkserver/datastore/file_pool.cpp index 0c6489ed80..170cf9eecc 100644 --- a/src/chunkserver/datastore/file_pool.cpp +++ b/src/chunkserver/datastore/file_pool.cpp @@ -298,12 +298,12 @@ bool FilePool::CheckValid() { return true; } -bool FilePool::CleanChunk(uint64_t chunkid, bool onlyMarked) { +int FilePool::CleanChunk(uint64_t chunkid, bool onlyMarked) { std::string chunkpath = currentdir_ + "/" + std::to_string(chunkid); int ret = fsptr_->Open(chunkpath, O_RDWR); if (ret < 0) { LOG(ERROR) << "Open file failed: " << chunkpath; - return false; + return ret; } int fd = ret; @@ -315,7 +315,7 @@ bool FilePool::CleanChunk(uint64_t chunkid, bool onlyMarked) { ret = fsptr_->Fallocate(fd, FALLOC_FL_ZERO_RANGE, 0, chunklen); if (ret < 0) { LOG(ERROR) << "Fallocate file failed: " << chunkpath; - return false; + return ret; } } else { int nbytes; @@ -330,10 +330,10 @@ bool FilePool::CleanChunk(uint64_t chunkid, bool onlyMarked) { std::min(ntotal - nwrite, (uint64_t)bytesPerWrite)); if (nbytes < 0) { LOG(ERROR) << "Write file failed: " << chunkpath; - return false; + return nbytes; } else if (fsptr_->Fsync(fd) < 0) { LOG(ERROR) << "Fsync file failed: " << chunkpath; - return false; + return nbytes; } cleanThrottle_.Add(false, bytesPerWrite); @@ -345,10 +345,10 @@ bool FilePool::CleanChunk(uint64_t chunkid, bool onlyMarked) { ret = fsptr_->Rename(chunkpath, targetpath); if (ret < 0) { LOG(ERROR) << "Rename file failed: " << chunkpath; - return false; + return ret; } - return true; + return ret; } bool FilePool::CleaningChunk() { @@ -380,7 +380,8 @@ bool FilePool::CleaningChunk() { } // Fill zero to specify chunk - if (!CleanChunk(chunkid, false)) { + int ret = CleanChunk(chunkid, false); + if (ret < 0) { pushBack(&dirtyChunks_, chunkid, ¤tState_.dirtyChunksLeft); return false; } @@ -544,7 +545,7 @@ bool FilePool::StopCleaning() { return true; } -bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { +int FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { auto pop = [&](std::vector* chunks, uint64_t* chunksLeft, bool isCleanChunks) -> bool { if (chunks->empty()) { @@ -572,8 +573,9 @@ bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { cond_.wait(lk, wake_up); } if (!needClean) { - return pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || + ret = pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true); + return ret ? 0 : -1; } // Need clean chunk @@ -581,11 +583,17 @@ bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { ret = pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true) || pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false); } - if (true == ret && false == *isCleaned && CleanChunk(*chunkid, true)) { - *isCleaned = true; + int cleanRet = -1; + if (true == ret && false == *isCleaned) { + cleanRet = CleanChunk(*chunkid, true); + if (cleanRet < 0) { + *isCleaned = false; + } else { + *isCleaned = true; + } } - return *isCleaned; + return *isCleaned ? 0 : cleanRet; } int FilePool::GetFile(const std::string &targetpath, const char *metapage, @@ -598,7 +606,8 @@ int FilePool::GetFile(const std::string &targetpath, const char *metapage, std::string srcpath; if (poolOpt_.getFileFromPool) { bool isCleaned = false; - if (!GetChunk(needClean, &chunkID, &isCleaned)) { + ret = GetChunk(needClean, &chunkID, &isCleaned); + if (ret < 0) { LOG(ERROR) << "No avaliable chunk!"; break; } @@ -609,16 +618,16 @@ int FilePool::GetFile(const std::string &targetpath, const char *metapage, } else { srcpath = currentdir_ + "/" + std::to_string(currentmaxfilenum_.fetch_add(1)); - int r = AllocateChunk(srcpath); - if (r < 0) { + ret = AllocateChunk(srcpath); + if (ret < 0) { LOG(ERROR) << "file allocate failed, " << srcpath.c_str(); retry++; continue; } } - bool rc = WriteMetaPage(srcpath, metapage); - if (rc) { + ret = WriteMetaPage(srcpath, metapage); + if (ret >= 0) { // Here, the RENAME_NOREPLACE mode is used to rename the file. // When the target file exists, it is not allowed to be overwritten. // That is to say, creating a file through FilePool needs to ensure @@ -664,7 +673,7 @@ int FilePool::AllocateChunk(const std::string &chunkpath) { if (ret < 0) { fsptr_->Close(fd); LOG(ERROR) << "Fallocate failed, " << chunkpath.c_str(); - return -1; + return ret; } char *data = new (std::nothrow) char[chunklen]; @@ -675,7 +684,7 @@ int FilePool::AllocateChunk(const std::string &chunkpath) { fsptr_->Close(fd); delete[] data; LOG(ERROR) << "write failed, " << chunkpath.c_str(); - return -1; + return ret; } delete[] data; @@ -683,7 +692,7 @@ int FilePool::AllocateChunk(const std::string &chunkpath) { if (ret < 0) { fsptr_->Close(fd); LOG(ERROR) << "fsync failed, " << chunkpath.c_str(); - return -1; + return ret; } ret = fsptr_->Close(fd); @@ -693,14 +702,14 @@ int FilePool::AllocateChunk(const std::string &chunkpath) { return ret; } -bool FilePool::WriteMetaPage(const std::string &sourcepath, const char *page) { +int FilePool::WriteMetaPage(const std::string &sourcepath, const char *page) { int fd = -1; int ret = -1; ret = fsptr_->Open(sourcepath.c_str(), O_RDWR); if (ret < 0) { LOG(ERROR) << "file open failed, " << sourcepath.c_str(); - return false; + return ret; } fd = ret; @@ -709,22 +718,22 @@ bool FilePool::WriteMetaPage(const std::string &sourcepath, const char *page) { if (ret != static_cast(poolOpt_.metaPageSize)) { fsptr_->Close(fd); LOG(ERROR) << "write metapage failed, " << sourcepath.c_str(); - return false; + return ret; } ret = fsptr_->Fsync(fd); if (ret != 0) { fsptr_->Close(fd); LOG(ERROR) << "fsync metapage failed, " << sourcepath.c_str(); - return false; + return ret; } ret = fsptr_->Close(fd); if (ret != 0) { LOG(ERROR) << "close failed, " << sourcepath.c_str(); - return false; + return ret; } - return true; + return ret; } int FilePool::RecycleFile(const std::string &chunkpath) { @@ -919,6 +928,10 @@ size_t FilePool::Size() { return currentState_.preallocatedChunksLeft; } +bool FilePool::EnoughChunk() { + return Size() >= poolOpt_.chunkReserved; +} + FilePoolState FilePool::GetState() const { return currentState_; } diff --git a/src/chunkserver/datastore/file_pool.h b/src/chunkserver/datastore/file_pool.h index 16b2311982..0aeee0274b 100644 --- a/src/chunkserver/datastore/file_pool.h +++ b/src/chunkserver/datastore/file_pool.h @@ -71,6 +71,7 @@ struct FilePoolOptions { uint32_t preAllocateNum; uint64_t filePoolSize; uint32_t formatThreadNum; + uint32_t chunkReserved; std::string copysetDir; std::string recycleDir; @@ -94,6 +95,7 @@ struct FilePoolOptions { formatThreadNum = 1; ::memset(metaPath, 0, 256); ::memset(filePoolDir, 0, 256); + chunkReserved = 0; } }; @@ -218,6 +220,10 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { * Get the current chunkfile pool size */ virtual size_t Size(); + /** + * Returns whether there is enough chunk space + */ + virtual bool EnoughChunk(); /** * Get the allocation status of FilePool */ @@ -289,9 +295,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { * Perform metapage assignment for the new chunkfile * @param: sourcepath is the file path to be written * @param: page is the metapage information to be written - * @return: returns true if successful, otherwise false + * @return: return 0 if successful, otherwise return less than 0 */ - bool WriteMetaPage(const std::string& sourcepath, const char* page); + int WriteMetaPage(const std::string& sourcepath, const char* page); /** * Directly allocate chunks, not from FilePool * @param: chunkpath is the path of the chunk file in the datastore @@ -304,9 +310,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { * @param needClean: Whether need the zeroed chunk * @param chunkid: The return chunk's id * @param isCleaned: Whether the return chunk is zeroed - * @return: Return false if there is no valid chunk, else return true + * @return: return 0 if successful, otherwise return less than 0 */ - bool GetChunk(bool needClean, uint64_t* chunkid, bool* isCleaned); + int GetChunk(bool needClean, uint64_t* chunkid, bool* isCleaned); /** * @brief: Zeroing specify chunk file @@ -314,9 +320,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { * @param onlyMarked: Use fallocate() to zeroing chunk file * if onlyMarked is ture, otherwise * write all bytes in chunk to zero - * @return: Return true if success, else return false + * @return: return 0 if successful, otherwise return less than 0 */ - bool CleanChunk(uint64_t chunkid, bool onlyMarked); + int CleanChunk(uint64_t chunkid, bool onlyMarked); /** * @brief: Clean chunk one by one diff --git a/src/chunkserver/heartbeat.cpp b/src/chunkserver/heartbeat.cpp index b81fe6bdb3..690eb0e743 100644 --- a/src/chunkserver/heartbeat.cpp +++ b/src/chunkserver/heartbeat.cpp @@ -238,7 +238,7 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) { */ curve::mds::heartbeat::DiskState* diskState = new curve::mds::heartbeat::DiskState(); - diskState->set_errtype(0); + diskState->set_errtype(curve::mds::heartbeat::NORMAL); diskState->set_errmsg(""); req->set_allocated_diskstate(diskState); @@ -295,6 +295,12 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) { req->set_diskcapacity(cap); req->set_diskused(cap - avail); + if (options_.chunkserverDiskLimit > 0 && + req->diskused() * 100 / cap > options_.chunkserverDiskLimit) { + diskState->set_errtype(curve::mds::heartbeat::DISKFULL); + diskState->set_errmsg("Disk near full"); + } + std::vector copysets; copysetMan_->GetAllCopysetNodes(©sets); diff --git a/src/chunkserver/heartbeat.h b/src/chunkserver/heartbeat.h index df86d8e88a..8913096734 100644 --- a/src/chunkserver/heartbeat.h +++ b/src/chunkserver/heartbeat.h @@ -69,6 +69,7 @@ struct HeartbeatOptions { uint32_t timeout; CopysetNodeManager* copysetNodeManager; ScanManager* scanManager; + uint32_t chunkserverDiskLimit; std::shared_ptr fs; std::shared_ptr chunkFilePool; diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 817e65c79f..9738238861 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -70,6 +70,12 @@ void ChunkOpRequest::Process() { return; } + // check if copyset node readonly + if (node_->ReadOnly()) { + response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY); + return; + } + /** * 如果propose成功,说明request成功交给了raft处理, * 那么done_就不能被调用,只有propose失败了才需要提前返回 @@ -457,6 +463,12 @@ void WriteChunkRequest::OnApply(uint64_t index, << ", request: " << request_->ShortDebugString(); response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + response_->set_status( + CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE); } else if (CSErrorCode::InternalError == ret || CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { @@ -492,29 +504,38 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, request.clonefileoffset()); } - auto ret = datastore->WriteChunk(request.chunkid(), + while (true) { + auto ret = datastore->WriteChunk(request.chunkid(), request.sn(), data, request.offset(), request.size(), &cost, cloneSourceLocation); - if (CSErrorCode::Success == ret) { - return; - } else if (CSErrorCode::BackwardRequestError == ret) { - LOG(WARNING) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } else if (CSErrorCode::InternalError == ret || - CSErrorCode::CrcCheckError == ret || - CSErrorCode::FileFormatError == ret) { - LOG(FATAL) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } else { - LOG(ERROR) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); + if (CSErrorCode::Success == ret) { + return; + } else if (CSErrorCode::BackwardRequestError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + datastore->WaitForDiskFreed(); + continue; + } else if (CSErrorCode::InternalError == ret || + CSErrorCode::CrcCheckError == ret || + CSErrorCode::FileFormatError == ret) { + LOG(FATAL) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else { + LOG(ERROR) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } + break; } } @@ -713,6 +734,12 @@ void PasteChunkInternalRequest::Process() { return; } + // check if copyset node readonly + if (node_->ReadOnly()) { + response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY); + return; + } + /** * 如果propose成功,说明request成功交给了raft处理, * 那么done_就不能被调用,只有propose失败了才需要提前返回 @@ -737,6 +764,10 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, } else if (CSErrorCode::InternalError == ret) { LOG(FATAL) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(ERROR) << "paste chunk failed: " + << ", request: " << request_->ShortDebugString(); + response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE); } else { LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); @@ -749,20 +780,28 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, void PasteChunkInternalRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, const butil::IOBuf &data) { - // NOTE: 处理过程中优先使用参数传入的datastore/request - auto ret = datastore->PasteChunk(request.chunkid(), - data.to_string().c_str(), - request.offset(), - request.size()); - if (CSErrorCode::Success == ret) - return; + while (true) { + // NOTE: 处理过程中优先使用参数传入的datastore/request + auto ret = datastore->PasteChunk(request.chunkid(), + data.to_string().c_str(), + request.offset(), + request.size()); + if (CSErrorCode::Success == ret) + return; - if (CSErrorCode::InternalError == ret) { - LOG(FATAL) << "paste chunk failed: " - << ", request: " << request.ShortDebugString(); - } else { - LOG(ERROR) << "paste chunk failed: " - << ", request: " << request.ShortDebugString(); + if (CSErrorCode::InternalError == ret) { + LOG(FATAL) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(ERROR) << "paste chunk failed: " + << ", request: " << request_->ShortDebugString(); + datastore->WaitForDiskFreed(); + continue; + } else { + LOG(ERROR) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } + break; } } diff --git a/src/chunkserver/raftlog/curve_segment.cpp b/src/chunkserver/raftlog/curve_segment.cpp index f51bf3a82e..d21167279f 100644 --- a/src/chunkserver/raftlog/curve_segment.cpp +++ b/src/chunkserver/raftlog/curve_segment.cpp @@ -52,6 +52,9 @@ namespace chunkserver { DEFINE_bool(raftSyncSegments, true, "call fsync when a segment is closed"); DEFINE_bool(enableWalDirectWrite, true, "enable wal direct write or not"); DEFINE_uint32(walAlignSize, 4096, "wal align size to write"); +DEFINE_uint32(waitForDiskFreedIntervalMs, 60000, + "wait for retry time when disk space is insufficient"); + int CurveSegment::create() { if (!_is_open) { @@ -65,11 +68,20 @@ int CurveSegment::create() { char* metaPage = new char[_meta_page_size]; memset(metaPage, 0, _meta_page_size); memcpy(metaPage, &_meta.bytes, sizeof(_meta.bytes)); - int res = _walFilePool->GetFile(path, metaPage); - delete[] metaPage; - if (res != 0) { - LOG(ERROR) << "Get segment from chunk file pool fail!"; - return -1; + int res; + while (true) { + int res = _walFilePool->GetFile(path, metaPage); + if (res == -ENOSPC) { + bthread_usleep(FLAGS_waitForDiskFreedIntervalMs); + continue; + } + + delete[] metaPage; + if (res != 0) { + LOG(ERROR) << "Get segment from chunk file pool fail!"; + return -1; + } + break; } _fd = ::open(path.c_str(), O_RDWR|O_NOATIME, 0644); if (_fd >= 0) { diff --git a/src/client/chunk_closure.cpp b/src/client/chunk_closure.cpp index 592e9d2a06..d1ec2dcfee 100644 --- a/src/client/chunk_closure.cpp +++ b/src/client/chunk_closure.cpp @@ -95,11 +95,13 @@ void ClientClosure::PreProcessBeforeRetry(int rpcstatus, int cntlstatus) { } uint64_t nextSleepUS = 0; - - if (!retryDirectly_) { + if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY || + rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE) { + nextSleepUS = failReqOpt_.chunkserverWaitDiskFreeRetryIntervalMS; + } else if (!retryDirectly_) { nextSleepUS = failReqOpt_.chunkserverOPRetryIntervalUS; if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED) { - nextSleepUS /= 10; + nextSleepUS /= 10; } } @@ -238,6 +240,16 @@ void ClientClosure::Run() { OnEpochTooOld(); break; + case CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY: + needRetry = true; + OnReadOnly(); + break; + + case CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE: + needRetry = true; + OnNoSpace(); + break; + default: needRetry = true; LOG(WARNING) << OpTypeToString(reqCtx_->optype_) @@ -366,6 +378,30 @@ void ClientClosure::OnEpochTooOld() { << butil::endpoint2str(cntl_->remote_side()).c_str(); } +void ClientClosure::OnReadOnly() { + reqDone_->SetFailed(status_); + LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " copyset is readonly, " + << *reqCtx_ + << ", status = " << status_ + << ", retried times = " << reqDone_->GetRetriedTimes() + << ", IO id = " << reqDone_->GetIOTracker()->GetID() + << ", request id = " << reqCtx_->id_ + << ", remote side = " + << butil::endpoint2str(cntl_->remote_side()).c_str(); +} + +void ClientClosure::OnNoSpace() { + reqDone_->SetFailed(status_); + LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " copyset is no space, " + << *reqCtx_ + << ", status = " << status_ + << ", retried times = " << reqDone_->GetRetriedTimes() + << ", IO id = " << reqDone_->GetIOTracker()->GetID() + << ", request id = " << reqCtx_->id_ + << ", remote side = " + << butil::endpoint2str(cntl_->remote_side()).c_str(); +} + void ClientClosure::OnRedirected() { LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " redirected, " << *reqCtx_ diff --git a/src/client/chunk_closure.h b/src/client/chunk_closure.h index f5d9acd220..2764d263ee 100644 --- a/src/client/chunk_closure.h +++ b/src/client/chunk_closure.h @@ -113,6 +113,12 @@ class ClientClosure : public Closure { // handle epoch too old void OnEpochTooOld(); + // handle readonly + void OnReadOnly(); + + // handle nospace + void OnNoSpace(); + // 非法参数 void OnInvalidRequest(); diff --git a/src/client/config_info.h b/src/client/config_info.h index 620d464eae..0e8343d694 100644 --- a/src/client/config_info.h +++ b/src/client/config_info.h @@ -138,6 +138,8 @@ struct ChunkServerUnstableOption { * copyset 标记为unstable,促使其下次发送rpc前,先去getleader。 * @chunkserverMinRetryTimesForceTimeoutBackoff: * 当一个请求重试次数超过阈值时,还在重试 使其超时时间进行指数退避 + * @chunkserverWaitDiskFreeRetryIntervalMS: + * 当请求返回readonly或者nospace错误,hang住io等待一段时间后重试。 */ struct FailureRequestOption { uint32_t chunkserverOPMaxRetry = 3; @@ -146,6 +148,7 @@ struct FailureRequestOption { uint64_t chunkserverMaxRPCTimeoutMS = 64000; uint64_t chunkserverMaxRetrySleepIntervalUS = 64ull * 1000 * 1000; uint64_t chunkserverMinRetryTimesForceTimeoutBackoff = 5; + uint64_t chunkserverWaitDiskFreeRetryIntervalMS = 60 * 1000; // When a request remains outstanding beyond this threshold, it is marked as // a slow request. diff --git a/src/fs/ext4_filesystem_impl.cpp b/src/fs/ext4_filesystem_impl.cpp index f4cd6cfcdb..e31c816a90 100644 --- a/src/fs/ext4_filesystem_impl.cpp +++ b/src/fs/ext4_filesystem_impl.cpp @@ -316,15 +316,21 @@ int Ext4FileSystemImpl::Write(int fd, buf + relativeOffset, remainLength, offset); + if (ret < 0) { if (errno == EINTR && retryTimes < MAX_RETYR_TIME) { ++retryTimes; continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { + LOG(ERROR) << "pwrite failed, fd: " << fd + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); + return -errno; } - LOG(ERROR) << "pwrite failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); - return -errno; } remainLength -= ret; offset += ret; @@ -353,13 +359,18 @@ int Ext4FileSystemImpl::Write(int fd, if (errno == EINTR || retryTimes < MAX_RETYR_TIME) { ++retryTimes; continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { + LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed, fd: " + << fd + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); + return -errno; } - LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); - return -errno; } - remainLength -= ret; offset += ret; } diff --git a/src/mds/heartbeat/heartbeat_manager.cpp b/src/mds/heartbeat/heartbeat_manager.cpp index 188ee03112..a3faf52fec 100644 --- a/src/mds/heartbeat/heartbeat_manager.cpp +++ b/src/mds/heartbeat/heartbeat_manager.cpp @@ -36,6 +36,7 @@ using ::curve::mds::topology::UNINTIALIZE_ID; using ::curve::mds::topology::ChunkServerStatus; using ::curve::mds::topology::ChunkServerStat; using ::curve::mds::topology::CopysetStat; +using ::curve::mds::topology::CopySetFilter; using ::curve::mds::topology::SplitPeerId; namespace curve { @@ -100,13 +101,34 @@ void HeartbeatManager::UpdateChunkServerDiskStatus( const ChunkServerHeartbeatRequest &request) { // update ChunkServerState status (disk status) ChunkServerState state; - if (request.diskstate().errtype() != 0) { + curve::mds::heartbeat::ErrorType errType = request.diskstate().errtype(); + + if (errType == curve::mds::heartbeat::DISKFULL) { + // When the chunkserver disk is close to full, copyset availflag + // needs to be set to false to prevent new space from being + // allocated from these copysets. + CopySetFilter filter = [](const curve::mds::topology::CopySetInfo &cs) { + return cs.IsAvailable(); + }; + std::vector keys = topology_->GetCopySetsInChunkServer( + request.chunkserverid(), filter); + for (auto key : keys) { + topology_->SetCopySetAvalFlag(key, false); + } + // If disk error is set, the copyset will not be migrated to + // this chunkserver. + state.SetDiskState(curve::mds::topology::DISKFULL); + LOG(ERROR) << "heartbeat report disk full error, " + << "diskused = " << request.diskused() + << "capacity = " << request.diskcapacity() + << "chunkserverid =" << request.chunkserverid(); + } else if (errType == curve::mds::heartbeat::NORMAL) { + state.SetDiskState(curve::mds::topology::DISKNORMAL); + } else { state.SetDiskState(curve::mds::topology::DISKERROR); LOG(ERROR) << "heartbeat report disk error, " << "errortype = " << request.diskstate().errtype() << "errmsg = " << request.diskstate().errmsg(); - } else { - state.SetDiskState(curve::mds::topology::DISKNORMAL); } state.SetDiskCapacity(request.diskcapacity()); diff --git a/src/mds/nameserver2/curvefs.cpp b/src/mds/nameserver2/curvefs.cpp index 5d5af4d75f..3cf96e0f25 100644 --- a/src/mds/nameserver2/curvefs.cpp +++ b/src/mds/nameserver2/curvefs.cpp @@ -171,6 +171,7 @@ void CurveFS::Uninit() { allocStatistic_ = nullptr; fileRecordManager_ = nullptr; snapshotCloneClient_ = nullptr; + topology_ = nullptr; } void CurveFS::InitRootFile(void) { diff --git a/test/chunkserver/fake_datastore.h b/test/chunkserver/fake_datastore.h index 75b5c80330..c2aedf1733 100644 --- a/test/chunkserver/fake_datastore.h +++ b/test/chunkserver/fake_datastore.h @@ -51,6 +51,7 @@ class FakeCSDataStore : public CSDataStore { snapDeleteFlag_ = false; error_ = CSErrorCode::Success; chunkSize_ = options.chunkSize; + enoughChunk_ = true; } virtual ~FakeCSDataStore() { delete chunk_; @@ -204,6 +205,14 @@ class FakeCSDataStore : public CSDataStore { } } + bool EnoughChunk() { + return enoughChunk_; + } + + void SetEnoughChunk(bool enoughChunk) { + enoughChunk_ = enoughChunk; + } + void InjectError(CSErrorCode errorCode = CSErrorCode::InternalError) { error_ = errorCode; } @@ -226,6 +235,7 @@ class FakeCSDataStore : public CSDataStore { SequenceNum sn_; CSErrorCode error_; uint32_t chunkSize_; + bool enoughChunk_; }; class FakeFilePool : public FilePool { diff --git a/test/chunkserver/op_request_test.cpp b/test/chunkserver/op_request_test.cpp index 20a4181444..12314ad8cc 100644 --- a/test/chunkserver/op_request_test.cpp +++ b/test/chunkserver/op_request_test.cpp @@ -391,6 +391,7 @@ TEST(ChunkOpRequestTest, OnApplyErrorTest) { std::shared_ptr dataStore = std::make_shared(options, fs); nodePtr->SetCSDateStore(dataStore); + nodePtr->on_leader_start(1); // write data store error will cause fatal, so not test in here @@ -592,6 +593,106 @@ TEST(ChunkOpRequestTest, OnApplyErrorTest) { delete opReq; delete cntl; } + // write: data store no space + { + ChunkRequest request; + ChunkResponse response; + request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_WRITE); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + request.set_chunkid(chunkId); + request.set_offset(offset); + request.set_size(size); + request.set_sn(sn); + brpc::Controller *cntl = new brpc::Controller(); + ChunkOpRequest *opReq = new WriteChunkRequest(nodePtr, + cntl, + &request, + &response, + nullptr); + dataStore->InjectError(curve::chunkserver::NoSpaceError); + OpFakeClosure done; + opReq->OnApply(appliedIndex, &done); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE, + response.status()); + delete opReq; + delete cntl; + } + // paste: data store no space + { + ChunkRequest request; + ChunkResponse response; + request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_PASTE); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + request.set_chunkid(chunkId); + request.set_offset(offset); + request.set_size(size); + request.set_sn(sn); + ChunkOpRequest *opReq = new PasteChunkInternalRequest(nodePtr, + &request, + &response, + nullptr, + nullptr); + dataStore->InjectError(curve::chunkserver::NoSpaceError); + OpFakeClosure done; + opReq->OnApply(appliedIndex, &done); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE, + response.status()); + delete opReq; + } + // write: data store readonly + { + ChunkRequest request; + ChunkResponse response; + request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_WRITE); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + request.set_chunkid(chunkId); + request.set_offset(offset); + request.set_size(size); + request.set_sn(sn); + dataStore->SetEnoughChunk(false); + brpc::Controller *cntl = new brpc::Controller(); + ChunkOpRequest *opReq = new WriteChunkRequest(nodePtr, + cntl, + &request, + &response, + nullptr); + OpFakeClosure done; + opReq->Process(); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY, + response.status()); + delete opReq; + delete cntl; + dataStore->SetEnoughChunk(true); + } + // paste: data store readonly + { + ChunkRequest request; + ChunkResponse response; + request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_PASTE); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + request.set_chunkid(chunkId); + request.set_offset(offset); + request.set_size(size); + request.set_sn(sn); + dataStore->SetEnoughChunk(false); + brpc::Controller *cntl = new brpc::Controller(); + ChunkOpRequest *opReq = new WriteChunkRequest(nodePtr, + cntl, + &request, + &response, + nullptr); + OpFakeClosure done; + opReq->Process(); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY, + response.status()); + delete opReq; + delete cntl; + dataStore->SetEnoughChunk(true); + } // write: backward request { ChunkRequest request; diff --git a/test/client/copyset_client_test.cpp b/test/client/copyset_client_test.cpp index b71383ec9d..6546897344 100644 --- a/test/client/copyset_client_test.cpp +++ b/test/client/copyset_client_test.cpp @@ -427,6 +427,7 @@ TEST_F(CopysetClientTest, write_error_test) { ioSenderOpt.failRequestOpt.chunkserverOPRetryIntervalUS = 5000; ioSenderOpt.failRequestOpt.chunkserverMaxRPCTimeoutMS = 3500; ioSenderOpt.failRequestOpt.chunkserverMaxRetrySleepIntervalUS = 3500000; + ioSenderOpt.failRequestOpt.chunkserverWaitDiskFreeRetryIntervalMS = 2000; RequestScheduleOption reqopt; reqopt.ioSenderOpt = ioSenderOpt; @@ -910,6 +911,119 @@ TEST_F(CopysetClientTest, write_error_test) { ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_EPOCH_TOO_OLD, reqDone->GetErrorCode()); } + // chunkserver readonly + { + RequestContext *reqCtx = new FakeRequestContext(); + reqCtx->optype_ = OpType::WRITE; + reqCtx->idinfo_ = ChunkIDInfo(chunkId, logicPoolId, copysetId); + + reqCtx->writeData_ = iobuf; + reqCtx->offset_ = 0; + reqCtx->rawlength_ = len; + + curve::common::CountDownEvent cond(1); + RequestClosure *reqDone = new FakeRequestClosure(&cond, reqCtx); + reqDone->SetFileMetric(&fm); + reqDone->SetIOTracker(&iot); + + reqCtx->done_ = reqDone; + gWriteCntlFailedCode = 0; + ChunkResponse response; + response.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY); + ChunkResponse response2; + response2.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); + EXPECT_CALL(mockMetaCache, GetLeader(_, _, _, _, _, _)) + .Times(2).WillRepeatedly(DoAll(SetArgPointee<2>(leaderId), + SetArgPointee<3>(leaderAddr), + Return(0))); + EXPECT_CALL(mockChunkService, WriteChunk(_, _, _, _)).Times(2) + .WillOnce(DoAll(SetArgPointee<2>(response), + Invoke(WriteChunkFunc))) + .WillOnce(DoAll(SetArgPointee<2>(response2), + Invoke(WriteChunkFunc))); + auto startTimeUs = curve::common::TimeUtility::GetTimeofDayUs(); + copysetClient.WriteChunk(reqCtx->idinfo_, fileId, epoch, 0, + iobuf, offset, len, {}, reqDone); + cond.Wait(); + auto elpased = curve::common::TimeUtility::GetTimeofDayUs() + - startTimeUs; + ASSERT_GE(elpased, + ioSenderOpt.failRequestOpt.chunkserverWaitDiskFreeRetryIntervalMS); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, + reqDone->GetErrorCode()); + } + // chunkserver no space + { + RequestContext *reqCtx = new FakeRequestContext(); + reqCtx->optype_ = OpType::WRITE; + reqCtx->idinfo_ = ChunkIDInfo(chunkId, logicPoolId, copysetId); + + reqCtx->writeData_ = iobuf; + reqCtx->offset_ = 0; + reqCtx->rawlength_ = len; + + curve::common::CountDownEvent cond(1); + RequestClosure *reqDone = new FakeRequestClosure(&cond, reqCtx); + reqDone->SetFileMetric(&fm); + reqDone->SetIOTracker(&iot); + + reqCtx->done_ = reqDone; + gWriteCntlFailedCode = 0; + ChunkResponse response; + response.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE); + ChunkResponse response2; + response2.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); + EXPECT_CALL(mockMetaCache, GetLeader(_, _, _, _, _, _)) + .Times(2).WillRepeatedly(DoAll(SetArgPointee<2>(leaderId), + SetArgPointee<3>(leaderAddr), + Return(0))); + EXPECT_CALL(mockChunkService, WriteChunk(_, _, _, _)).Times(2) + .WillOnce(DoAll(SetArgPointee<2>(response), + Invoke(WriteChunkFunc))) + .WillOnce(DoAll(SetArgPointee<2>(response2), + Invoke(WriteChunkFunc))); + auto startTimeUs = curve::common::TimeUtility::GetTimeofDayUs(); + copysetClient.WriteChunk(reqCtx->idinfo_, fileId, epoch, 0, + iobuf, offset, len, {}, reqDone); + cond.Wait(); + auto elpased = curve::common::TimeUtility::GetTimeofDayUs() + - startTimeUs; + ASSERT_GE(elpased, + ioSenderOpt.failRequestOpt.chunkserverWaitDiskFreeRetryIntervalMS); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, + reqDone->GetErrorCode()); + } + // chunkserver exit + { + RequestContext *reqCtx = new FakeRequestContext(); + reqCtx->optype_ = OpType::WRITE; + reqCtx->idinfo_ = ChunkIDInfo(chunkId, logicPoolId, copysetId); + + reqCtx->writeData_ = iobuf; + reqCtx->offset_ = 0; + reqCtx->rawlength_ = len; + + curve::common::CountDownEvent cond(1); + RequestClosure *reqDone = new FakeRequestClosure(&cond, reqCtx); + reqDone->SetFileMetric(&fm); + reqDone->SetIOTracker(&iot); + + reqCtx->done_ = reqDone; + ChunkResponse response; + response.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_EXIST); + EXPECT_CALL(mockMetaCache, GetLeader(_, _, _, _, _, _)) + .Times(1).WillOnce(DoAll(SetArgPointee<2>(leaderId), + SetArgPointee<3>(leaderAddr), + Return(0))); + EXPECT_CALL(mockChunkService, WriteChunk(_, _, _, _)).Times(1) + .WillOnce(DoAll(SetArgPointee<2>(response), + Invoke(WriteChunkFunc))); + copysetClient.WriteChunk(reqCtx->idinfo_, fileId, epoch, 0, + iobuf, offset, len, {}, reqDone); + cond.Wait(); + ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_EXIST, + reqDone->GetErrorCode()); + } scheduler.Fini(); } @@ -1688,6 +1802,36 @@ TEST_F(CopysetClientTest, read_error_test) { ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, reqDone->GetErrorCode()); } + /* read backward */ + { + RequestContext *reqCtx = new FakeRequestContext(); + reqCtx->optype_ = OpType::READ; + reqCtx->idinfo_ = ChunkIDInfo(chunkId, logicPoolId, copysetId); + + reqCtx->subIoIndex_ = 0; + reqCtx->offset_ = 0; + reqCtx->rawlength_ = len; + + curve::common::CountDownEvent cond(1); + RequestClosure *reqDone = new FakeRequestClosure(&cond, reqCtx); + reqDone->SetFileMetric(&fm); + reqDone->SetIOTracker(&iot); + + reqCtx->done_ = reqDone; + ChunkResponse response; + response.set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); + EXPECT_CALL(mockMetaCache, GetLeader(_, _, _, _, _, _)) + .Times(AtLeast(1)).WillOnce(DoAll(SetArgPointee<2>(leaderId), + SetArgPointee<3>(leaderAddr), + Return(0))); + EXPECT_CALL(mockChunkService, ReadChunk(_, _, _, _)).Times(1) + .WillOnce(DoAll(SetArgPointee<2>(response), + Invoke(ReadChunkFunc))); + copysetClient.ReadChunk(reqCtx->idinfo_, sn, + offset, len, {}, reqDone); + cond.Wait(); + ASSERT_EQ(-1, reqDone->GetErrorCode()); + } scheduler.Fini(); } diff --git a/test/integration/heartbeat/common.cpp b/test/integration/heartbeat/common.cpp index 5d09293287..91ce6eaaee 100644 --- a/test/integration/heartbeat/common.cpp +++ b/test/integration/heartbeat/common.cpp @@ -92,6 +92,17 @@ void HeartbeatIntegrationCommon::UpdateCopysetTopo( ASSERT_EQ(0, topology_->UpdateCopySetTopo(copysetInfo)); } +void HeartbeatIntegrationCommon::CheckCopysetAvail(ChunkServerIdType id, + bool avail) { + std::vector keys = topology_->GetCopySetsInChunkServer(id); + for (auto key : keys) { + CopySetInfo info; + if (topology_->GetCopySet(key, &info)) { + ASSERT_EQ(avail, info.IsAvailable()); + } + } +} + void HeartbeatIntegrationCommon::SendHeartbeat( const ChunkServerHeartbeatRequest &request, bool expectFailed, ChunkServerHeartbeatResponse *response) { @@ -119,7 +130,7 @@ void HeartbeatIntegrationCommon::BuildBasicChunkServerRequest( req->set_ip(out.GetHostIp()); req->set_port(out.GetPort()); auto diskState = new ::curve::mds::heartbeat::DiskState(); - diskState->set_errtype(0); + diskState->set_errtype(curve::mds::heartbeat::NORMAL); diskState->set_errmsg("disk ok"); req->set_allocated_diskstate(diskState); req->set_diskcapacity(100); diff --git a/test/integration/heartbeat/common.h b/test/integration/heartbeat/common.h index b281d5a9ab..28508d6ba9 100644 --- a/test/integration/heartbeat/common.h +++ b/test/integration/heartbeat/common.h @@ -294,6 +294,13 @@ class HeartbeatIntegrationCommon { const std::set &members, ChunkServerIdType candidate = UNINTIALIZE_ID); + /* CheckCopysetAvail 检查copyset的状态 + * + * @param[in] chunkserverId chunkserver的id + * @param[in] avail copyset状态 + */ + void CheckCopysetAvail(ChunkServerIdType chunkserverId, bool avail); + /* SendHeartbeat 发送心跳 * * @param[in] req diff --git a/test/integration/heartbeat/heartbeat_basic_test.cpp b/test/integration/heartbeat/heartbeat_basic_test.cpp index c9a2ae416d..1948fc1ba4 100644 --- a/test/integration/heartbeat/heartbeat_basic_test.cpp +++ b/test/integration/heartbeat/heartbeat_basic_test.cpp @@ -299,6 +299,26 @@ TEST_F(HeartbeatBasicTest, test_chunkserver_ip_port_not_match) { rep.statuscode()); } +TEST_F(HeartbeatBasicTest, test_chunkserver_disk_full) { + // 发送空间不足请求 + ChunkServerHeartbeatRequest req; + hbtest_->BuildBasicChunkServerRequest(1, &req); + ::curve::mds::topology::CopySetInfo copysetInfo; + ASSERT_TRUE( + hbtest_->topology_->GetCopySet(CopySetKey{ 1, 1 }, ©setInfo)); + CopySetInfo csInfo(1, 1); + BuildCopySetInfo(&csInfo, 1, 1, copysetInfo.GetCopySetMembers()); + hbtest_->AddCopySetToRequest(&req, csInfo); + auto *state = req.mutable_diskstate(); + state->set_errtype(curve::mds::heartbeat::DISKFULL); + ChunkServerHeartbeatResponse rep; + hbtest_->SendHeartbeat(req, SENDHBOK, &rep); + + ASSERT_EQ(::curve::mds::heartbeat::hbOK, rep.statuscode()); + // 检查copyset availflag是否为false + hbtest_->CheckCopysetAvail(1, false); +} + TEST_F(HeartbeatBasicTest, test_chunkserver_offline_then_online) { // chunkserver上报心跳时间间隔大于offline // sleep 800ms, 该chunkserver onffline状态 diff --git a/test/mds/heartbeat/common.cpp b/test/mds/heartbeat/common.cpp index 5771c564f9..49a355778d 100644 --- a/test/mds/heartbeat/common.cpp +++ b/test/mds/heartbeat/common.cpp @@ -38,7 +38,7 @@ ChunkServerHeartbeatRequest GetChunkServerHeartbeatRequestForTest() { request.set_copysetcount(100); DiskState *state = new DiskState(); - state->set_errtype(1); + state->set_errtype(curve::mds::heartbeat::NORMAL); std::string *errMsg = new std::string("healthy"); state->set_allocated_errmsg(errMsg); request.set_allocated_diskstate(state); diff --git a/test/mds/server/mds_test.cpp b/test/mds/server/mds_test.cpp index 236e526371..8a8c08c5d6 100644 --- a/test/mds/server/mds_test.cpp +++ b/test/mds/server/mds_test.cpp @@ -181,7 +181,7 @@ TEST_F(MDSTest, common) { request1.set_ip("127.0.0.1"); request1.set_port(8888); heartbeat::DiskState *diskState = new heartbeat::DiskState(); - diskState->set_errtype(0); + diskState->set_errtype(curve::mds::heartbeat::NORMAL); diskState->set_errmsg(""); request1.set_allocated_diskstate(diskState); request1.set_diskcapacity(2ull * 1024 * 1024 * 1024); diff --git a/tools-v2/pkg/cli/command/curvebs/list/list.go b/tools-v2/pkg/cli/command/curvebs/list/list.go index 44472a01a9..61d269b790 100644 --- a/tools-v2/pkg/cli/command/curvebs/list/list.go +++ b/tools-v2/pkg/cli/command/curvebs/list/list.go @@ -34,6 +34,7 @@ import ( "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/server" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/snapshot" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/space" + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets" "github.com/spf13/cobra" ) @@ -55,6 +56,7 @@ func (listCmd *ListCommand) AddSubCommands() { may_broken_vol.NewMayBrokenVolCommand(), formatstatus.NewFormatStatusCommand(), snapshot.NewSnapShotCommand(), + unavailcopysets.NewUnAvailCopySetsCommand(), ) } diff --git a/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go b/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go index f9e9f5a103..a6af8a8ac5 100644 --- a/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go +++ b/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go @@ -24,8 +24,10 @@ package unavailcopysets import ( "context" + "fmt" cmderror "github.com/opencurve/curve/tools-v2/internal/error" + cobrautil "github.com/opencurve/curve/tools-v2/internal/utils" basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" "github.com/opencurve/curve/tools-v2/pkg/config" "github.com/opencurve/curve/tools-v2/pkg/output" @@ -36,6 +38,10 @@ import ( "google.golang.org/grpc" ) +const ( + listUnAvailCopySetExample = `$ curve bs list unavail-copyset` +) + type ListUnAvailCopySets struct { Info *basecmd.Rpc Request *topology.ListUnAvailCopySetsRequest @@ -66,7 +72,11 @@ func NewUnAvailCopySetsCommand() *cobra.Command { func NewListUnAvailCopySetsCommand() *UnAvailCopySetsCommand { uCmd := &UnAvailCopySetsCommand{ - FinalCurveCmd: basecmd.FinalCurveCmd{}, + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "unavail-copyset", + Short: "list unavail copyset", + Example: listUnAvailCopySetExample, + }, } basecmd.NewFinalCurveCli(&uCmd.FinalCurveCmd, uCmd) @@ -90,6 +100,8 @@ func (uCmd *UnAvailCopySetsCommand) Init(cmd *cobra.Command, args []string) erro Request: &topology.ListUnAvailCopySetsRequest{}, Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "ListUnAvailCopySets"), } + header := []string{cobrautil.ROW_LOGICALPOOL, cobrautil.ROW_COPYSET} + uCmd.SetHeader(header) return nil } @@ -109,6 +121,18 @@ func (uCmd *UnAvailCopySetsCommand) RunCommand(cmd *cobra.Command, args []string return cmderror.ErrBsListPhysicalPoolRpc(code).ToError() } uCmd.response = response.Copysets + uCmd.Result = response.Copysets + rows := make([]map[string]string, 0) + for _, info := range response.Copysets { + row := make(map[string]string) + row[cobrautil.ROW_LOGICALPOOL] = fmt.Sprintf("%d", info.GetLogicalPoolId()) + row[cobrautil.ROW_COPYSET] = fmt.Sprintf("%d", info.GetCopysetId()) + rows = append(rows, row) + } + list := cobrautil.ListMap2ListSortByKeys(rows, uCmd.Header, []string{ + cobrautil.ROW_LOGICALPOOL, cobrautil.ROW_COPYSET, + }) + uCmd.TableNew.AppendBulk(list) return nil }