From 31343512cf42bb560228eee03ef2959d61e828a8 Mon Sep 17 00:00:00 2001 From: liuminjian Date: Wed, 29 Nov 2023 04:55:03 -0500 Subject: [PATCH] 1.heartbeat reports disk full error and mds set copyset availflag false. 2.copyset node leader set readonly when receive copyset availflag false from heartbeat. 3.if the disk becomes full while writing to the chunk file, the server return no space err and client hangs until space is freed up manually. Signed-off-by: liuminjian --- WORKSPACE | 1 - proto/chunk.proto | 1 + proto/copyset.proto | 2 - proto/topology.proto | 10 - src/chunkserver/copyset_node.cpp | 17 +- src/chunkserver/copyset_node.h | 12 - src/chunkserver/copyset_service.cpp | 40 -- src/chunkserver/copyset_service.h | 8 - .../datastore/chunkserver_chunkfile.cpp | 7 + .../datastore/chunkserver_snapshot.cpp | 4 + src/chunkserver/datastore/define.h | 2 + src/chunkserver/op_request.cpp | 380 +++++------------- src/chunkserver/op_request.h | 49 +-- src/client/chunk_closure.cpp | 19 + src/client/chunk_closure.h | 3 + src/client/config_info.h | 3 + src/fs/ext4_filesystem_impl.cpp | 53 ++- src/mds/topology/topology_service.cpp | 13 - src/mds/topology/topology_service.h | 9 +- src/mds/topology/topology_service_manager.cpp | 93 ----- src/mds/topology/topology_service_manager.h | 4 - test/chunkserver/clone/op_request_test.cpp | 89 ++-- test/chunkserver/copyset_node_test.cpp | 16 +- test/chunkserver/copyset_service_test.cpp | 44 -- test/chunkserver/mock_copyset_node.h | 8 - test/chunkserver/op_request_test.cpp | 77 +--- test/mds/topology/mock_topology.h | 9 - .../test_topology_service_manager.cpp | 52 +-- .../braft/add-iterator-has_error.patch | 28 -- .../command/curvebs/delete/copyset/copyset.go | 102 ----- .../pkg/cli/command/curvebs/delete/delete.go | 2 - 31 files changed, 256 insertions(+), 901 deletions(-) delete mode 100644 thirdparties/braft/add-iterator-has_error.patch delete mode 100644 tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go diff --git a/WORKSPACE b/WORKSPACE index ddeffa41a8..a423f1c46a 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -37,7 +37,6 @@ git_repository( commit = "d12de388c97998f5ccd5cb97ed0da728815ef438", patches = [ "//:thirdparties/braft/0001-fix-change-set_error-to-set_errorv.patch", - "//:thirdparties/braft/add-iterator-has_error.patch", ], patch_args = [ "-p1" diff --git a/proto/chunk.proto b/proto/chunk.proto index 32cbbb8836..94eec1b1a8 100755 --- a/proto/chunk.proto +++ b/proto/chunk.proto @@ -86,6 +86,7 @@ enum CHUNK_OP_STATUS { CHUNK_OP_STATUS_CHUNK_EXIST = 11; // chunk已存在 CHUNK_OP_STATUS_EPOCH_TOO_OLD = 12; // request epoch too old CHUNK_OP_STATUS_READONLY = 13; // copyset其他节点故障,设为只读 + CHUNK_OP_STATUS_ENOSPC = 14; // 空间不足错误 }; message ChunkResponse { diff --git a/proto/copyset.proto b/proto/copyset.proto index 7cebef68a1..fe3d271d53 100755 --- a/proto/copyset.proto +++ b/proto/copyset.proto @@ -100,6 +100,4 @@ service CopysetService { rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse); rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse); - - rpc DeleteBrokenCopysetNode (CopysetRequest2) returns (CopysetResponse2); }; diff --git a/proto/topology.proto b/proto/topology.proto index 5c6031f6e9..c1ef9d97e5 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -565,14 +565,6 @@ message ListUnAvailCopySetsResponse { repeated common.CopysetInfo copysets = 2; } -message DeleteBrokenCopysetInChunkServerRequest { - required uint32 chunkServerID = 1; -} - -message DeleteBrokenCopysetInChunkServerResponse { - required sint32 statusCode = 1; -} - //TODO(hzsunjianliang): update userPolicy and so on service TopologyService { rpc RegistChunkServer(ChunkServerRegistRequest) returns (ChunkServerRegistResponse); @@ -618,6 +610,4 @@ service TopologyService { rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse); rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse); rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse); - rpc DeleteBrokenCopysetInChunkServer(DeleteBrokenCopysetInChunkServerRequest) returns (DeleteBrokenCopysetInChunkServerResponse); - } diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index 0a01039714..a70958d449 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -293,7 +293,6 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { */ braft::Closure *closure = iter.done(); - std::shared_ptr wrapperPtr = std::make_shared(&iter); if (nullptr != closure) { /** * 1.closure不是null,那么说明当前节点正常,直接从内存中拿到Op @@ -306,7 +305,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { std::shared_ptr& opRequest = chunkClosure->request_; concurrentapply_->Push(opRequest->ChunkId(), ChunkOpRequest::Schedule(opRequest->OpType()), // NOLINT &ChunkOpRequest::OnApply, opRequest, - iter.index(), doneGuard.release(), wrapperPtr); + iter.index(), doneGuard.release()); } else { // 获取log entry butil::IOBuf log = iter.data(); @@ -323,11 +322,9 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { auto chunkId = request.chunkid(); concurrentapply_->Push(chunkId, ChunkOpRequest::Schedule(request.optype()), // NOLINT &ChunkOpRequest::OnApplyFromLog, opReq, - dataStore_, std::move(request), data, wrapperPtr); + dataStore_, std::move(request), data); } } - // 等待写操作完成,否则on_apply结束后,异步有写错误无法调用set_error_and_rollback() - concurrentapply_->Flush(); } void CopysetNode::on_shutdown() { @@ -556,7 +553,7 @@ void CopysetNode::on_leader_stop(const butil::Status &status) { } void CopysetNode::on_error(const ::braft::Error &e) { - LOG(ERROR) << "Copyset: " << GroupIdString() + LOG(FATAL) << "Copyset: " << GroupIdString() << ", peer id: " << peerId_.to_string() << " meet raft error: " << e; } @@ -1126,13 +1123,5 @@ SyncChunkThread::~SyncChunkThread() { Stop(); } -void IteratorWrapper::set_error_and_rollback(size_t ntail, const butil::Status* st) { - iter_->set_error_and_rollback(ntail, st); -} - -bool IteratorWrapper::has_error() const{ - return iter_->has_error(); -} - } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index 2bca4bb648..dbdf6111a2 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -124,18 +124,6 @@ class SyncChunkThread : public curve::common::Uncopyable { CopysetNode* node_; }; -// 用于unitest mock braft::Iterator -class IteratorWrapper { - public: - IteratorWrapper() {} - IteratorWrapper(braft::Iterator *iter): iter_(iter) {} - ~IteratorWrapper() {} - virtual void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL); - virtual bool has_error() const; - private: - braft::Iterator *iter_; -}; - /** * 一个Copyset Node就是一个复制组的副本 */ diff --git a/src/chunkserver/copyset_service.cpp b/src/chunkserver/copyset_service.cpp index b47f7c541f..e09516c0ad 100755 --- a/src/chunkserver/copyset_service.cpp +++ b/src/chunkserver/copyset_service.cpp @@ -232,45 +232,5 @@ void CopysetServiceImpl::GetCopysetStatus(RpcController *controller, request->copysetid()); } -void CopysetServiceImpl::DeleteBrokenCopysetNode(RpcController *controller, - const CopysetRequest2 *request, - CopysetResponse2 *response, - Closure *done) { - (void)controller; - brpc::ClosureGuard doneGuard(done); - - Copyset copyset; - - LOG(INFO) << "Received DeleteBrokenCopysetNode request"; - - for (int i = 0; i < request->copysets_size(); ++i) { - copyset = request->copysets(i); - - // 判断copyset是否存在 - auto nodePtr = copysetNodeManager_->GetCopysetNode(copyset.logicpoolid(), - copyset.copysetid()); - if (nullptr == nodePtr) { - continue; - } - - NodeStatus status; - nodePtr->GetStatus(&status); - // 只删除状态有问题的copyset node - if (status.state != braft::State::STATE_ERROR) { - continue; - } - - copysetNodeManager_->DeleteCopysetNode(copyset.logicpoolid(), copyset.copysetid()); - - LOG(INFO) << "Delete copyset node" - << ToGroupIdString(copyset.logicpoolid(), - copyset.copysetid()) - << " success."; - } - - response->set_status(COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - LOG(INFO) << "DeleteBrokenCopysetNode " << request->copysets().size() << " copysets success"; -} - } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/copyset_service.h b/src/chunkserver/copyset_service.h index ebb8ee430c..fabf6df8fc 100755 --- a/src/chunkserver/copyset_service.h +++ b/src/chunkserver/copyset_service.h @@ -71,14 +71,6 @@ class CopysetServiceImpl : public CopysetService { CopysetStatusResponse *response, Closure *done); - /** - * 删除状态ERROR的copyset node - */ - void DeleteBrokenCopysetNode(RpcController *controller, - const CopysetRequest2 *request, - CopysetResponse2 *response, - Closure *done); - private: // 复制组管理者 CopysetNodeManager* copysetNodeManager_; diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index a6e6d3e0cc..a7f0a1aef3 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/datastore/chunkserver_chunkfile.h" @@ -400,6 +401,9 @@ CSErrorCode CSChunkFile::Write(SequenceNum sn, << "ChunkID: " << chunkId_ << ",request sn: " << sn << ",chunk sn: " << metaPage_.sn; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } // If it is a clone chunk, the bitmap will be updated @@ -478,6 +482,9 @@ CSErrorCode CSChunkFile::Paste(const char * buf, off_t offset, size_t length) { << "ChunkID: " << chunkId_ << ", offset: " << offset << ", length: " << length; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } } diff --git a/src/chunkserver/datastore/chunkserver_snapshot.cpp b/src/chunkserver/datastore/chunkserver_snapshot.cpp index f1e398e8c6..67ed83102d 100644 --- a/src/chunkserver/datastore/chunkserver_snapshot.cpp +++ b/src/chunkserver/datastore/chunkserver_snapshot.cpp @@ -21,6 +21,7 @@ */ #include +#include #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/datastore/chunkserver_snapshot.h" @@ -216,6 +217,9 @@ CSErrorCode CSSnapshot::Write(const char * buf, off_t offset, size_t length) { LOG(ERROR) << "Write snapshot failed." << "ChunkID: " << chunkId_ << ",snapshot sn: " << metaPage_.sn; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } uint32_t pageBeginIndex = offset / blockSize_; diff --git a/src/chunkserver/datastore/define.h b/src/chunkserver/datastore/define.h index 41c25677cf..1d0dda13fc 100644 --- a/src/chunkserver/datastore/define.h +++ b/src/chunkserver/datastore/define.h @@ -73,6 +73,8 @@ enum CSErrorCode { // The page has not been written, it will appear when the page that has not // been written is read when the clone chunk is read PageNerverWrittenError = 13, + // ENOSPC error + NoSpaceError = 14, }; // Chunk details diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 6c9655f49d..1a46b6a2c1 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -201,36 +201,18 @@ uint64_t MaxAppliedIndex( } // namespace void DeleteChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->DeleteChunk(request_->chunkid(), request_->sn()); if (CSErrorCode::Success == ret) { response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); node_->UpdateAppliedIndex(index); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete chunk failed: " + LOG(FATAL) << "delete chunk failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "delete chunk failed: " << " data store return: " << ret @@ -243,18 +225,7 @@ void DeleteChunkRequest::OnApply(uint64_t index, void DeleteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - + const butil::IOBuf &data) { (void)data; // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->DeleteChunk(request.chunkid(), @@ -263,10 +234,9 @@ void DeleteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, return; if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete failed: " + LOG(FATAL) << "delete failed: " << " data store return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); } else { LOG(ERROR) << "delete failed: " << " data store return: " << ret @@ -314,7 +284,7 @@ void ReadChunkRequest::Process() { auto task = std::bind(&ReadChunkRequest::OnApply, thisPtr, node_->GetAppliedIndex(), - doneGuard.release(), nullptr); + doneGuard.release()); concurrentApplyModule_->Push(request_->chunkid(), ChunkOpRequest::Schedule(request_->optype()), // NOLINT task); @@ -334,8 +304,7 @@ void ReadChunkRequest::Process() { } void ReadChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { // 先清除response中的status,以保证CheckForward后的判断的正确性 response_->clear_status(); @@ -400,8 +369,7 @@ void ReadChunkRequest::OnApply(uint64_t index, void ReadChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)datastore; (void)request; (void)data; @@ -465,22 +433,8 @@ void ReadChunkRequest::ReadChunk() { } void WriteChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "write chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - uint32_t cost; std::string cloneSourceLocation; @@ -509,23 +463,23 @@ void WriteChunkRequest::OnApply(uint64_t index, << ", request: " << request_->ShortDebugString(); response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + response_->set_status( + CHUNK_OP_STATUS::CHUNK_OP_STATUS_ENOSPC); } else if (CSErrorCode::InternalError == ret || CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset + * internalerror一般是磁盘错误,为了防止副本不一致,让进程退出 + * TODO(yyk): 当前遇到write错误直接fatal退出整个 + * ChunkServer后期考虑仅仅标坏这个copyset,保证较好的可用性 */ - LOG(ERROR) << "write failed: " + LOG(FATAL) << "write failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "write failed: " << " data store return: " << ret @@ -540,18 +494,7 @@ void WriteChunkRequest::OnApply(uint64_t index, void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "write chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - + const butil::IOBuf &data) { // NOTE: 处理过程中优先使用参数传入的datastore/request uint32_t cost; std::string cloneSourceLocation; @@ -561,56 +504,44 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, request.clonefileoffset()); } - auto ret = datastore->WriteChunk(request.chunkid(), + do { + auto ret = datastore->WriteChunk(request.chunkid(), request.sn(), data, request.offset(), request.size(), &cost, cloneSourceLocation); - if (CSErrorCode::Success == ret) { - return; - } else if (CSErrorCode::BackwardRequestError == ret) { - LOG(WARNING) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } else if (CSErrorCode::InternalError == ret || - CSErrorCode::CrcCheckError == ret || - CSErrorCode::FileFormatError == ret) { - - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "write failed: " - << " data store return: " << ret - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - } else { - LOG(ERROR) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } + if (CSErrorCode::Success == ret) { + return; + } else if (CSErrorCode::BackwardRequestError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + sleep(WAIT_FOR_DISK_FREED); + continue; + } else if (CSErrorCode::InternalError == ret || + CSErrorCode::CrcCheckError == ret || + CSErrorCode::FileFormatError == ret) { + LOG(FATAL) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else { + LOG(ERROR) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } + } while (false); + } void ReadSnapshotRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "read snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - char *readBuffer = nullptr; uint32_t size = request_->size(); readBuffer = new(std::nothrow)char[size]; @@ -645,13 +576,9 @@ void ReadSnapshotRequest::OnApply(uint64_t index, * 3.internal error */ if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "read snapshot failed: " + LOG(FATAL) << "read snapshot failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } /** * 4.其他错误 @@ -668,8 +595,7 @@ void ReadSnapshotRequest::OnApply(uint64_t index, void ReadSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)datastore; (void)request; (void)data; @@ -678,22 +604,8 @@ void ReadSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, } void DeleteSnapshotRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - CSErrorCode ret = datastore_->DeleteSnapshotChunkOrCorrectSn( request_->chunkid(), request_->correctedsn()); if (CSErrorCode::Success == ret) { @@ -706,14 +618,9 @@ void DeleteSnapshotRequest::OnApply(uint64_t index, response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete snapshot or correct sn failed: " + LOG(FATAL) << "delete snapshot or correct sn failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "delete snapshot or correct sn failed: " << " data store return: " << ret @@ -727,18 +634,8 @@ void DeleteSnapshotRequest::OnApply(uint64_t index, void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->DeleteSnapshotChunkOrCorrectSn( request.chunkid(), request.correctedsn()); @@ -749,11 +646,9 @@ void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastor << " data store return: " << ret << ", request: " << request.ShortDebugString(); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete snapshot or correct sn failed: " + LOG(FATAL) << "delete snapshot or correct sn failed: " << " data store return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; } else { LOG(ERROR) << "delete snapshot or correct sn failed: " << " data store return: " << ret @@ -762,22 +657,9 @@ void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastor } void CreateCloneChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "create clone chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->CreateCloneChunk(request_->chunkid(), request_->sn(), request_->correctedsn(), @@ -791,17 +673,13 @@ void CreateCloneChunkRequest::OnApply(uint64_t index, CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "create clone failed: " + * TODO(yyk): 当前遇到createclonechunk错误直接fatal退出整个 + * ChunkServer后期考虑仅仅标坏这个copyset,保证较好的可用性 + */ + LOG(FATAL) << "create clone failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; + CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); } else if (CSErrorCode::ChunkConflictError == ret) { LOG(WARNING) << "create clone chunk exist: " << ", request: " << request_->ShortDebugString(); @@ -819,19 +697,8 @@ void CreateCloneChunkRequest::OnApply(uint64_t index, void CreateCloneChunkRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "create clone chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->CreateCloneChunk(request.chunkid(), request.sn(), @@ -850,14 +717,8 @@ void CreateCloneChunkRequest::OnApplyFromLog(std::shared_ptr datast if (CSErrorCode::InternalError == ret || CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "create clone failed: " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); + LOG(FATAL) << "create clone failed:" + << ", request: " << request.ShortDebugString(); } else { LOG(ERROR) << "create clone failed: " << ", request: " << request.ShortDebugString(); @@ -889,22 +750,9 @@ void PasteChunkInternalRequest::Process() { } void PasteChunkInternalRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "paste chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->PasteChunk(request_->chunkid(), data_.to_string().c_str(), //NOLINT request_->offset(), @@ -914,19 +762,12 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); node_->UpdateAppliedIndex(index); } else if (CSErrorCode::InternalError == ret) { - - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ + LOG(FATAL) << "paste chunk failed: " + << ", request: " << request_->ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; + response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE); } else { LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); @@ -938,56 +779,36 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, void PasteChunkInternalRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "paste chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - // NOTE: 处理过程中优先使用参数传入的datastore/request - auto ret = datastore->PasteChunk(request.chunkid(), - data.to_string().c_str(), - request.offset(), - request.size()); - if (CSErrorCode::Success == ret) - return; + const butil::IOBuf &data) { + do { + // NOTE: 处理过程中优先使用参数传入的datastore/request + auto ret = datastore->PasteChunk(request.chunkid(), + data.to_string().c_str(), + request.offset(), + request.size()); + if (CSErrorCode::Success == ret) + return; - if (CSErrorCode::InternalError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "paste chunk failed: " + if (CSErrorCode::InternalError == ret) { + LOG(FATAL) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - } else { - LOG(ERROR) << "paste chunk failed: " - << ", request: " << request.ShortDebugString(); - } + sleep(WAIT_FOR_DISK_FREED); + continue; + } else { + LOG(ERROR) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } + } while (false); + } void ScanChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "scan chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - return; - } - // read and calculate crc, build scanmap uint32_t crc = 0; size_t size = request_->size(); @@ -1029,11 +850,8 @@ void ScanChunkRequest::OnApply(uint64_t index, response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "scan chunk failed, read chunk internal error" + LOG(FATAL) << "scan chunk failed, read chunk internal error" << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); } else { response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); @@ -1042,19 +860,8 @@ void ScanChunkRequest::OnApply(uint64_t index, void ScanChunkRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "scan chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - uint32_t crc = 0; size_t size = request.size(); std::unique_ptr readBuffer(new(std::nothrow)char[size]); @@ -1083,10 +890,9 @@ void ScanChunkRequest::OnApplyFromLog(std::shared_ptr datastore, / << " datastore return: " << ret << ", request: " << request.ShortDebugString(); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "scan failed: " + LOG(FATAL) << "scan failed: " << " datastore return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); } else { LOG(ERROR) << "scan failed: " << " datastore return: " << ret diff --git a/src/chunkserver/op_request.h b/src/chunkserver/op_request.h index b6d4241368..6aaefce5c5 100755 --- a/src/chunkserver/op_request.h +++ b/src/chunkserver/op_request.h @@ -23,6 +23,8 @@ #ifndef SRC_CHUNKSERVER_OP_REQUEST_H_ #define SRC_CHUNKSERVER_OP_REQUEST_H_ +#define WAIT_FOR_DISK_FREED 60 /* in seconds */ + #include #include #include @@ -80,11 +82,9 @@ class ChunkOpRequest : public std::enable_shared_from_this { * request正常情况从内存中获取上下文on apply逻辑 * @param index:此op log entry的index * @param done:对应的ChunkClosure - * @param iter:用于判断和更新StateMachine */ virtual void OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) = 0; + ::google::protobuf::Closure *done) = 0; /** * NOTE: 子类实现过程中优先使用参数传入的datastore/request @@ -95,12 +95,10 @@ class ChunkOpRequest : public std::enable_shared_from_this { * @param datastore:chunk数据持久化层 * @param request:反序列化后得到的request 细信息 * @param data:反序列化后得到的request要处理的数据 - * @param iter:用于判断和更新StateMachine */ virtual void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) = 0; + const butil::IOBuf &data) = 0; /** * 返回request的done成员 @@ -204,11 +202,10 @@ class DeleteChunkRequest : public ChunkOpRequest { done) {} virtual ~DeleteChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class ReadChunkRequest : public ChunkOpRequest { @@ -228,10 +225,10 @@ class ReadChunkRequest : public ChunkOpRequest { virtual ~ReadChunkRequest() = default; void Process() override; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; const ChunkRequest* GetChunkRequest() { return request_; @@ -267,11 +264,10 @@ class WriteChunkRequest : public ChunkOpRequest { done) {} virtual ~WriteChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr); + void OnApply(uint64_t index, ::google::protobuf::Closure *done); void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class ReadSnapshotRequest : public ChunkOpRequest { @@ -290,11 +286,10 @@ class ReadSnapshotRequest : public ChunkOpRequest { done) {} virtual ~ReadSnapshotRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class DeleteSnapshotRequest : public ChunkOpRequest { @@ -313,11 +308,10 @@ class DeleteSnapshotRequest : public ChunkOpRequest { done) {} virtual ~DeleteSnapshotRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class CreateCloneChunkRequest : public ChunkOpRequest { @@ -336,11 +330,10 @@ class CreateCloneChunkRequest : public ChunkOpRequest { done) {} virtual ~CreateCloneChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class PasteChunkInternalRequest : public ChunkOpRequest { @@ -364,11 +357,10 @@ class PasteChunkInternalRequest : public ChunkOpRequest { virtual ~PasteChunkInternalRequest() = default; void Process() override; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; private: butil::IOBuf data_; @@ -391,11 +383,10 @@ class ScanChunkRequest : public ChunkOpRequest { scanManager_(scanManager) {} virtual ~ScanChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; private: void BuildAndSendScanMap(const ChunkRequest &request, uint64_t index, diff --git a/src/client/chunk_closure.cpp b/src/client/chunk_closure.cpp index ebed92ae88..97269dd8b0 100644 --- a/src/client/chunk_closure.cpp +++ b/src/client/chunk_closure.cpp @@ -101,6 +101,8 @@ void ClientClosure::PreProcessBeforeRetry(int rpcstatus, int cntlstatus) { if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED) { nextSleepUS /= 10; } + } else if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY || rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE) { + nextSleepUS = failReqOpt_.chunkserverWaitDiskFreeRetryIntervalMS; } LOG(WARNING) @@ -243,6 +245,11 @@ void ClientClosure::Run() { OnReadOnly(); break; + case CHUNK_OP_STATUS::CHUNK_OP_STATUS_ENOSPC: + needRetry = true; + OnNoSpace(); + break; + default: needRetry = true; LOG(WARNING) << OpTypeToString(reqCtx_->optype_) @@ -383,6 +390,18 @@ void ClientClosure::OnReadOnly() { << butil::endpoint2str(cntl_->remote_side()).c_str(); } +void ClientClosure::OnNoSpace() { + reqDone_->SetFailed(status_); + LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " copyset is no space, " + << *reqCtx_ + << ", status = " << status_ + << ", retried times = " << reqDone_->GetRetriedTimes() + << ", IO id = " << reqDone_->GetIOTracker()->GetID() + << ", request id = " << reqCtx_->id_ + << ", remote side = " + << butil::endpoint2str(cntl_->remote_side()).c_str(); +} + void ClientClosure::OnRedirected() { LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " redirected, " << *reqCtx_ diff --git a/src/client/chunk_closure.h b/src/client/chunk_closure.h index c39231f76d..2764d263ee 100644 --- a/src/client/chunk_closure.h +++ b/src/client/chunk_closure.h @@ -116,6 +116,9 @@ class ClientClosure : public Closure { // handle readonly void OnReadOnly(); + // handle nospace + void OnNoSpace(); + // 非法参数 void OnInvalidRequest(); diff --git a/src/client/config_info.h b/src/client/config_info.h index 620d464eae..0e8343d694 100644 --- a/src/client/config_info.h +++ b/src/client/config_info.h @@ -138,6 +138,8 @@ struct ChunkServerUnstableOption { * copyset 标记为unstable,促使其下次发送rpc前,先去getleader。 * @chunkserverMinRetryTimesForceTimeoutBackoff: * 当一个请求重试次数超过阈值时,还在重试 使其超时时间进行指数退避 + * @chunkserverWaitDiskFreeRetryIntervalMS: + * 当请求返回readonly或者nospace错误,hang住io等待一段时间后重试。 */ struct FailureRequestOption { uint32_t chunkserverOPMaxRetry = 3; @@ -146,6 +148,7 @@ struct FailureRequestOption { uint64_t chunkserverMaxRPCTimeoutMS = 64000; uint64_t chunkserverMaxRetrySleepIntervalUS = 64ull * 1000 * 1000; uint64_t chunkserverMinRetryTimesForceTimeoutBackoff = 5; + uint64_t chunkserverWaitDiskFreeRetryIntervalMS = 60 * 1000; // When a request remains outstanding beyond this threshold, it is marked as // a slow request. diff --git a/src/fs/ext4_filesystem_impl.cpp b/src/fs/ext4_filesystem_impl.cpp index f4cd6cfcdb..91bdb327d6 100644 --- a/src/fs/ext4_filesystem_impl.cpp +++ b/src/fs/ext4_filesystem_impl.cpp @@ -316,19 +316,27 @@ int Ext4FileSystemImpl::Write(int fd, buf + relativeOffset, remainLength, offset); - if (ret < 0) { - if (errno == EINTR && retryTimes < MAX_RETYR_TIME) { - ++retryTimes; - continue; - } + + if (ret > 0) { + remainLength -= ret; + offset += ret; + relativeOffset += ret; + } + + if (errno == EINTR && retryTimes < MAX_RETYR_TIME) { + ++retryTimes; + continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { LOG(ERROR) << "pwrite failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); return -errno; } - remainLength -= ret; - offset += ret; - relativeOffset += ret; + } return length; } @@ -349,19 +357,24 @@ int Ext4FileSystemImpl::Write(int fd, while (remainLength > 0) { ssize_t ret = buf.pcut_into_file_descriptor(fd, offset, remainLength); - if (ret < 0) { - if (errno == EINTR || retryTimes < MAX_RETYR_TIME) { - ++retryTimes; - continue; - } + if (ret > 0) { + remainLength -= ret; + offset += ret; + } + + if (errno == EINTR || retryTimes < MAX_RETYR_TIME) { + ++retryTimes; + continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); return -errno; } - - remainLength -= ret; - offset += ret; } return length; diff --git a/src/mds/topology/topology_service.cpp b/src/mds/topology/topology_service.cpp index 988938bbeb..95ad76fcbc 100644 --- a/src/mds/topology/topology_service.cpp +++ b/src/mds/topology/topology_service.cpp @@ -1167,19 +1167,6 @@ void TopologyServiceImpl::ListChunkFormatStatus( topology_->ListChunkFormatStatus(request, response); } -void TopologyServiceImpl::DeleteBrokenCopysetInChunkServer( - google::protobuf::RpcController* cntl_base, - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response, - google::protobuf::Closure* done) { - brpc::ClosureGuard done_guard(done); - brpc::Controller* cntl = static_cast(cntl_base); - LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " - << cntl->remote_side() << " to " << cntl->local_side() - << ". [DeleteBrokenCopysetInChunkServer] " << request->DebugString(); - topology_->DeleteBrokenCopysetInChunkServer(request, response); -} - } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service.h b/src/mds/topology/topology_service.h index 46b6a315e7..dc5a46cfaa 100644 --- a/src/mds/topology/topology_service.h +++ b/src/mds/topology/topology_service.h @@ -236,14 +236,7 @@ class TopologyServiceImpl : public TopologyService { google::protobuf::RpcController* cntl_base, const ListChunkFormatStatusRequest* request, ListChunkFormatStatusResponse* response, - google::protobuf::Closure* done); - - virtual void DeleteBrokenCopysetInChunkServer( - google::protobuf::RpcController* cntl_base, - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response, - google::protobuf::Closure* done - ); + google::protobuf::Closure* done); private: std::shared_ptr topology_; diff --git a/src/mds/topology/topology_service_manager.cpp b/src/mds/topology/topology_service_manager.cpp index a16929f974..ce0b6822a4 100644 --- a/src/mds/topology/topology_service_manager.cpp +++ b/src/mds/topology/topology_service_manager.cpp @@ -1814,99 +1814,6 @@ void TopologyServiceManager::ListChunkFormatStatus( } } -void TopologyServiceManager::DeleteBrokenCopysetInChunkServer( - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response) { - std::map> csMap; - std::vector copysets = - topology_->GetCopySetsInChunkServer(request->chunkserverid()); - for (const CopySetKey& copyset : copysets) { - CopySetInfo csInfo; - if (topology_->GetCopySet(copyset, &csInfo)) { - if (!csInfo.IsAvailable()) { - for (ChunkServerIdType id : csInfo.GetCopySetMembers()) { - ChunkServer chunkserverInfo; - if (true != topology_->GetChunkServer(id, &chunkserverInfo)) { - continue; - } - std::string address = chunkserverInfo.GetHostIp() + ":" + std::to_string(chunkserverInfo.GetPort()); - if (csMap.find(address) != csMap.end()) { - csMap[address].push_back(csInfo); - } else { - std::vector temp; - temp.push_back(csInfo); - csMap[address] = temp; - } - } - } - } - } - - // 删除braft状态error的copyset node - for(const auto &iter : csMap) { - brpc::Controller cntl; - cntl.set_timeout_ms(500); - brpc::Channel channel; - if (channel.Init(iter.first.c_str(), nullptr) !=0) { - LOG(WARNING) << "can not create channel to " - << iter.first.c_str(); - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - - CopysetService_Stub stub(&channel); - - CopysetRequest2 copysetRequest; - CopysetResponse2 copysetResponse; - for (auto &cs : iter.second) { - ::curve::chunkserver::Copyset *copyset = copysetRequest.add_copysets(); - copyset->set_logicpoolid(cs.GetLogicalPoolId()); - copyset->set_copysetid(cs.GetId()); - } - - stub.DeleteBrokenCopysetNode(&cntl, - ©setRequest, - ©setResponse, - nullptr); - LOG(INFO) << "Send DeleteBrokenCopysetNode[log_id=" << cntl.log_id() - << "] from " << cntl.local_side() - << " to " << cntl.remote_side() - << ". [CopysetRequest] : " - << " copysetRequest.copysets_size() = " - << copysetRequest.copysets_size(); - - if (cntl.Failed()) { - LOG(ERROR) << "Send DeleteBrokenCopysetNode failed, " - << "cntl.errorText = " - << cntl.ErrorText() << std::endl; - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - } - - for (const CopySetKey& copyset : copysets) { - CopySetInfo csInfo; - if (topology_->GetCopySet(copyset, &csInfo)) { - if (!csInfo.IsAvailable()) { - // 删除copyset node peer,更新topo,等待replicaScheduler重新创建peer - std::set peers = csInfo.GetCopySetMembers(); - peers.erase(request->chunkserverid()); - topology_->UpdateCopySetTopo(csInfo); - // 设置copyset node availflag true - int res = topology_->SetCopySetAvalFlag(copyset, true); - if (res != kTopoErrCodeSuccess) { - LOG(ERROR) << "Topology set copyset aval flag fail"; - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - } - } - } - response->set_statuscode(kTopoErrCodeSuccess); -} - - - } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service_manager.h b/src/mds/topology/topology_service_manager.h index 799f9368db..bcbfc98f20 100644 --- a/src/mds/topology/topology_service_manager.h +++ b/src/mds/topology/topology_service_manager.h @@ -202,10 +202,6 @@ class TopologyServiceManager { const ListChunkFormatStatusRequest* request, ListChunkFormatStatusResponse* response); - virtual void DeleteBrokenCopysetInChunkServer( - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response); - private: /** * @brief create copyset for logical pool diff --git a/test/chunkserver/clone/op_request_test.cpp b/test/chunkserver/clone/op_request_test.cpp index e1ece23380..6746594097 100644 --- a/test/chunkserver/clone/op_request_test.cpp +++ b/test/chunkserver/clone/op_request_test.cpp @@ -63,11 +63,9 @@ class OpRequestTest datastore_ = std::make_shared(); cloneMgr_ = std::make_shared(); concurrentApplyModule_ = std::make_shared(); - iter_ = std::make_shared(); ASSERT_TRUE(concurrentApplyModule_->Init(10, 10)); FakeCopysetNode(); FakeCloneManager(); - FakeIteratorWrapper(); } void TearDown() { } @@ -93,13 +91,6 @@ class OpRequestTest .WillRepeatedly(Return(true)); } - void FakeIteratorWrapper() { - EXPECT_CALL(*iter_, set_error_and_rollback(_,_)) - .WillRepeatedly(Return()); - EXPECT_CALL(*iter_, has_error()) - .WillRepeatedly(Return(false)); - } - protected: ChunkSizeType chunksize_; ChunkSizeType blocksize_; @@ -109,7 +100,6 @@ class OpRequestTest std::shared_ptr datastore_; std::shared_ptr cloneMgr_; std::shared_ptr concurrentApplyModule_; - std::shared_ptr iter_; }; TEST_P(OpRequestTest, CreateCloneTest) { @@ -225,7 +215,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { EXPECT_CALL(*datastore_, CreateCloneChunk(_, _, _, _, _)) .WillOnce(Return(CSErrorCode::Success)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // check result ASSERT_TRUE(closure->isDone_); @@ -237,7 +227,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { /** * test OnApply * case:CreateCloneChunk failed - * expect:不返回成功 + * expect:process exit */ { // reset closure @@ -246,9 +236,8 @@ TEST_P(OpRequestTest, CreateCloneTest) { // check result EXPECT_CALL(*datastore_, CreateCloneChunk(_, _, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * test OnApply @@ -265,7 +254,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { EXPECT_CALL(*node_, UpdateAppliedIndex(_)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // check result ASSERT_TRUE(closure->isDone_); @@ -288,12 +277,12 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillOnce(Return(CSErrorCode::Success)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回InternalError - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -304,12 +293,12 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillRepeatedly(Return(CSErrorCode::InternalError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + ASSERT_DEATH(opReq->OnApplyFromLog(datastore_, *request, data), ""); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回其他错误 - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -320,7 +309,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -450,7 +439,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillOnce(Return(CSErrorCode::Success)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -462,7 +451,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { /** * 测试OnApply * 用例:CreateCloneChunk失败,返回InternalError - * 预期:不返回成功 + * 预期:进程退出 */ { // 重置closure @@ -472,9 +461,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * 测试OnApply @@ -489,7 +476,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -512,12 +499,12 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillOnce(Return(CSErrorCode::Success)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回InternalError - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -528,12 +515,12 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillRepeatedly(Return(CSErrorCode::InternalError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + ASSERT_DEATH(opReq->OnApplyFromLog(datastore_, *request, data), ""); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回其他错误 - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -544,7 +531,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -808,7 +795,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { .WillOnce(DoAll(SetArrayArgument<2>(chunkData, chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -845,7 +832,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { .WillOnce(DoAll(SetArrayArgument<2>(chunkData, chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -885,7 +872,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -910,7 +897,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { // 不会读chunk文件 EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -941,7 +928,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -977,7 +964,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1006,7 +993,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1017,7 +1004,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { /** * 测试OnApply * 用例:读本地chunk的时候返回错误 - * 预期:不返回成功 + * 预期:请求失败,返回 CHUNK_OP_STATUS_FAILURE_UNKNOWN */ { // 重置closure @@ -1032,9 +1019,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * 测试OnApply @@ -1060,7 +1045,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(false)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1077,7 +1062,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { closure->Reset(); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -1225,7 +1210,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1253,7 +1238,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1285,7 +1270,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -1311,7 +1296,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1335,7 +1320,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1367,7 +1352,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(false)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1384,7 +1369,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { closure->Reset(); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); diff --git a/test/chunkserver/copyset_node_test.cpp b/test/chunkserver/copyset_node_test.cpp index b894862882..46ed6a4fdb 100644 --- a/test/chunkserver/copyset_node_test.cpp +++ b/test/chunkserver/copyset_node_test.cpp @@ -492,14 +492,14 @@ TEST_F(CopysetNodeTest, error_test) { LOG(INFO) << "OK"; } /* on_error */ - // { - // LogicPoolID logicPoolID = 123; - // CopysetID copysetID = 1345; - // Configuration conf; - // CopysetNode copysetNode(logicPoolID, copysetID, conf); - // braft::Error error; - // ASSERT_DEATH(copysetNode.on_error(error), ".*raft error.*"); - // } + { + LogicPoolID logicPoolID = 123; + CopysetID copysetID = 1345; + Configuration conf; + CopysetNode copysetNode(logicPoolID, copysetID, conf); + braft::Error error; + ASSERT_DEATH(copysetNode.on_error(error), ".*raft error.*"); + } /* Fini, raftNode is null */ { LogicPoolID logicPoolID = 123; diff --git a/test/chunkserver/copyset_service_test.cpp b/test/chunkserver/copyset_service_test.cpp index 18c7f31b1a..973529366b 100644 --- a/test/chunkserver/copyset_service_test.cpp +++ b/test/chunkserver/copyset_service_test.cpp @@ -223,50 +223,6 @@ TEST_F(CopysetServiceTest, basic) { ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS); } - /* 测试创建一个新的 copyset */ - { - brpc::Controller cntl; - cntl.set_timeout_ms(3000); - - CopysetRequest request; - CopysetResponse response; - request.set_logicpoolid(logicPoolId); - request.set_copysetid(copysetId); - request.add_peerid("127.0.0.1:9040:0"); - request.add_peerid("127.0.0.1:9041:0"); - request.add_peerid("127.0.0.1:9042:0"); - stub.CreateCopysetNode(&cntl, &request, &response, nullptr); - if (cntl.Failed()) { - std::cout << cntl.ErrorText() << std::endl; - } - ASSERT_EQ(response.status(), - COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - } - // TEST CASES: DeleteBrokenCopysetNode - { - brpc::Controller cntl; - CopysetStatusRequest statusReq; - CopysetStatusResponse statusResp; - CopysetRequest2 request; - CopysetResponse2 response; - Copyset *copyset; - copyset = request.add_copysets(); - copyset->set_logicpoolid(logicPoolId); - copyset->set_copysetid(copysetId); - Peer *peer1 = copyset->add_peers(); - peer1->set_address("127.0.0.1:9040:0"); - Peer *peer2 = copyset->add_peers(); - peer2->set_address("127.0.0.1:9041:0"); - Peer *peer3 = copyset->add_peers(); - peer3->set_address("127.0.0.1:9042:0"); - stub.DeleteBrokenCopysetNode(&cntl, &request, &response, nullptr); - if (cntl.Failed()) { - std::cout << cntl.ErrorText() << std::endl; - } - ASSERT_TRUE(copysetNodeManager->DeleteCopysetNode(logicPoolId, copysetId)); - ASSERT_EQ(response.status(), COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - } - ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); } diff --git a/test/chunkserver/mock_copyset_node.h b/test/chunkserver/mock_copyset_node.h index 0886765387..30676eb4da 100644 --- a/test/chunkserver/mock_copyset_node.h +++ b/test/chunkserver/mock_copyset_node.h @@ -76,14 +76,6 @@ class MockCopysetNode : public CopysetNode { MOCK_METHOD1(on_start_following, void(const ::braft::LeaderChangeContext&)); }; -class MockIteratorWrapper : public IteratorWrapper { - public: - MockIteratorWrapper() = default; - ~MockIteratorWrapper() = default; - MOCK_METHOD2(set_error_and_rollback, void(size_t, const butil::Status*)); - MOCK_CONST_METHOD0(has_error, bool()); -}; - } // namespace chunkserver } // namespace curve diff --git a/test/chunkserver/op_request_test.cpp b/test/chunkserver/op_request_test.cpp index cf3ad8c65f..20a4181444 100644 --- a/test/chunkserver/op_request_test.cpp +++ b/test/chunkserver/op_request_test.cpp @@ -21,7 +21,6 @@ */ #include -#include #include #include #include @@ -34,14 +33,10 @@ #include "src/chunkserver/copyset_node_manager.h" #include "src/chunkserver/op_request.h" #include "test/chunkserver/fake_datastore.h" -#include "test/chunkserver/mock_copyset_node.h" namespace curve { namespace chunkserver { -using ::testing::Return; -using ::testing::_; - using ::google::protobuf::io::ZeroCopyOutputStream; class OpFakeClosure : public Closure { @@ -50,27 +45,7 @@ class OpFakeClosure : public Closure { ~OpFakeClosure() {} }; -class ChunkOpRequestTest: public testing::Test -{ -protected: - virtual void SetUp() - { - iter_ = std::make_shared(); - FakeIterator(); - } - virtual void TearDown() - { - - } - void FakeIterator() { - EXPECT_CALL(*iter_, set_error_and_rollback(_,_)) - .WillRepeatedly(Return()); - } -protected: - std::shared_ptr iter_; -}; - -TEST_F(ChunkOpRequestTest, encode) { +TEST(ChunkOpRequestTest, encode) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t chunkId = 12345; @@ -393,7 +368,7 @@ TEST_F(ChunkOpRequestTest, encode) { } } -TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { +TEST(ChunkOpRequestTest, OnApplyErrorTest) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t chunkId = 12345; @@ -440,7 +415,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -469,7 +444,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -496,9 +471,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -521,7 +494,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -546,9 +519,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -569,9 +540,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -592,7 +561,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::BackwardRequestError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD, @@ -619,9 +588,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -644,7 +611,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::BackwardRequestError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD, @@ -671,7 +638,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::InvalidArgError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -700,9 +667,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; delete scanManager; @@ -728,7 +693,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::FileFormatError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -758,7 +723,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -769,7 +734,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { } } -TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { +TEST(ChunkOpRequestTest, OnApplyFromLogTest) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t sn = 1; @@ -802,7 +767,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_READ); butil::IOBuf data; ReadChunkRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // read snapshot @@ -819,7 +784,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_READ_SNAP); butil::IOBuf data; ReadSnapshotRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // delete @@ -834,7 +799,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_DELETE); butil::IOBuf data; DeleteChunkRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // delete snapshot @@ -849,7 +814,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_DELETE_SNAP); butil::IOBuf data; DeleteSnapshotRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // scan @@ -864,7 +829,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_clonefileoffset(followScanRpcTimeoutMs); butil::IOBuf data; ScanChunkRequest req(1, PeerId("127.0.0.1:9010:0")); - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } } diff --git a/test/mds/topology/mock_topology.h b/test/mds/topology/mock_topology.h index dfff9c3d92..90afab62e1 100644 --- a/test/mds/topology/mock_topology.h +++ b/test/mds/topology/mock_topology.h @@ -322,10 +322,6 @@ class MockTopologyServiceManager : public TopologyServiceManager { MOCK_METHOD2(ListUnAvailCopySets, void(const ListUnAvailCopySetsRequest*, ListUnAvailCopySetsResponse*)); - - MOCK_METHOD2(DeleteBrokenCopysetInChunkServer, - void(const DeleteBrokenCopysetInChunkServerRequest*, - DeleteBrokenCopysetInChunkServerResponse*)); }; class MockTopologyServiceImpl : public TopologyService { @@ -368,11 +364,6 @@ class MockCopysetServiceImpl : public CopysetService { const ::curve::chunkserver::CopysetRequest2 *request, ::curve::chunkserver::CopysetResponse2 *response, google::protobuf::Closure *done)); - MOCK_METHOD4(DeleteBrokenCopysetNode, - void(::google::protobuf::RpcController *controller, - const ::curve::chunkserver::CopysetRequest2 *request, - ::curve::chunkserver::CopysetResponse2 *response, - google::protobuf::Closure *done)); }; } // namespace chunkserver diff --git a/test/mds/topology/test_topology_service_manager.cpp b/test/mds/topology/test_topology_service_manager.cpp index eab5570e9e..efc96de534 100644 --- a/test/mds/topology/test_topology_service_manager.cpp +++ b/test/mds/topology/test_topology_service_manager.cpp @@ -230,13 +230,12 @@ class TestTopologyServiceManager : public ::testing::Test { const std::set &members, bool scaning = false, LastScanSecType lastScanSec = 0, - bool lastScanConsistent = true, bool avail = true) { + bool lastScanConsistent = true) { CopySetInfo copysetInfo(logicalPoolId, copysetId); copysetInfo.SetCopySetMembers(members); copysetInfo.SetScaning(scaning); copysetInfo.SetLastScanSec(lastScanSec); copysetInfo.SetLastScanConsistent(lastScanConsistent); - copysetInfo.SetAvailableFlag(avail); EXPECT_CALL(*storage_, StorageCopySet(_)) .WillOnce(Return(true)); @@ -3674,55 +3673,6 @@ TEST_F(TestTopologyServiceManager, test_ListUnAvailCopySets) { } } -TEST_F(TestTopologyServiceManager, - test_DeleteBrokenCopysetInChunkServer_success) { - CopySetIdType copysetId = 0x51; - PrepareAddPoolset(); - PoolIdType logicalPoolId = 0x01; - PoolIdType physicalPoolId = 0x11; - PrepareAddPhysicalPool(physicalPoolId, "pPool1"); - PrepareAddZone(0x21, "zone1", physicalPoolId); - PrepareAddZone(0x22, "zone2", physicalPoolId); - PrepareAddZone(0x23, "zone3", physicalPoolId); - PrepareAddServer(0x31, "server1", "127.0.0.1", "127.0.0.1", 0x21, 0x11); - PrepareAddServer(0x32, "server2", "127.0.0.1", "127.0.0.1", 0x22, 0x11); - PrepareAddServer(0x33, "server3", "127.0.0.1", "127.0.0.1", 0x23, 0x11); - uint32_t port = listenAddr_.port; - PrepareAddChunkServer( - 0x41, "token1", "nvme", 0x31, "127.0.0.1", "127.0.0.1", port); - PrepareAddChunkServer( - 0x42, "token2", "nvme", 0x32, "127.0.0.1", "127.0.0.1", port); - PrepareAddChunkServer( - 0x43, "token3", "nvme", 0x33, "127.0.0.1", "127.0.0.1", port); - PrepareAddLogicalPool(logicalPoolId, "logicalPool1", physicalPoolId); - std::set replicas; - replicas.insert(0x41); - replicas.insert(0x42); - replicas.insert(0x43); - PrepareAddCopySet(copysetId, logicalPoolId, replicas, false, 0, true, false); - - CopysetResponse2 chunkserverResponse; - chunkserverResponse.set_status( - COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - EXPECT_CALL(*mockCopySetService, DeleteBrokenCopysetNode(_, _, _, _)) - .WillRepeatedly(DoAll(SetArgPointee<2>(chunkserverResponse), - Invoke(CreateCopysetNodeFunc))); - EXPECT_CALL(*storage_, UpdateCopySet(_)) - .WillRepeatedly(Return(true)); - DeleteBrokenCopysetInChunkServerRequest request; - request.set_chunkserverid(0x41); - DeleteBrokenCopysetInChunkServerResponse response; - serviceManager_->DeleteBrokenCopysetInChunkServer(&request, &response); - - ASSERT_EQ(kTopoErrCodeSuccess, response.statuscode()); - - ListUnAvailCopySetsRequest request2; - ListUnAvailCopySetsResponse response2; - serviceManager_->ListUnAvailCopySets(&request2, &response2); - ASSERT_EQ(kTopoErrCodeSuccess, response2.statuscode()); - ASSERT_EQ(0, response2.copysets_size()); -} - } // namespace topology } // namespace mds } // namespace curve diff --git a/thirdparties/braft/add-iterator-has_error.patch b/thirdparties/braft/add-iterator-has_error.patch deleted file mode 100644 index 2b4cffd0f6..0000000000 --- a/thirdparties/braft/add-iterator-has_error.patch +++ /dev/null @@ -1,28 +0,0 @@ -diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp -index 6069f70..11adf9a 100644 ---- a/src/braft/raft.cpp -+++ b/src/braft/raft.cpp -@@ -253,6 +253,10 @@ bool Iterator::valid() const { - return _impl->is_good() && _impl->entry()->type == ENTRY_TYPE_DATA; - } - -+bool Iterator::has_error() const { -+ return _impl->has_error(); -+} -+ - int64_t Iterator::index() const { return _impl->index(); } - - int64_t Iterator::term() const { return _impl->entry()->id.term; } -diff --git a/src/braft/raft.h b/src/braft/raft.h -index e4a0f19..83cd353 100644 ---- a/src/braft/raft.h -+++ b/src/braft/raft.h -@@ -178,6 +178,8 @@ public: - // If |st| is not NULL, it should describe the detail of the error. - void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL); - -+ bool has_error() const; -+ - private: - friend class FSMCaller; - Iterator(IteratorImpl* impl) : _impl(impl) {} \ No newline at end of file diff --git a/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go b/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go deleted file mode 100644 index 70c06cc2b6..0000000000 --- a/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go +++ /dev/null @@ -1,102 +0,0 @@ -package copyset - -import ( - "context" - cmderror "github.com/opencurve/curve/tools-v2/internal/error" - basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" - "github.com/opencurve/curve/tools-v2/pkg/config" - "github.com/opencurve/curve/tools-v2/pkg/output" - "github.com/opencurve/curve/tools-v2/proto/proto/topology" - "github.com/spf13/cobra" - "google.golang.org/grpc" - "strconv" -) - -const ( - deleteBrokenCopySetExample = `$ curve bs delete broken-copyset --chunkserverid=1` -) - -type DeleteBrokenCopySetInChunkServerRpc struct { - Info *basecmd.Rpc - Request *topology.DeleteBrokenCopysetInChunkServerRequest - topologyClient topology.TopologyServiceClient -} - -var _ basecmd.RpcFunc = (*DeleteBrokenCopySetInChunkServerRpc)(nil) // check interface - -type DeleteBrokenCopySetCommand struct { - basecmd.FinalCurveCmd - Rpc *DeleteBrokenCopySetInChunkServerRpc - Servers []*topology.ServerInfo - chunkserverid uint32 -} - -func (d *DeleteBrokenCopySetInChunkServerRpc) Stub_Func(ctx context.Context) (interface{}, error) { - return d.topologyClient.DeleteBrokenCopysetInChunkServer(ctx, d.Request) -} - -func NewDeleteCommand() *cobra.Command { - return NewDeleteBrokenCopySetCommand().Cmd -} - -func NewDeleteBrokenCopySetCommand() *DeleteBrokenCopySetCommand { - cmd := &DeleteBrokenCopySetCommand{ - FinalCurveCmd: basecmd.FinalCurveCmd{ - Use: "broken-copyset", - Short: "delete broken copyset in chunkserver", - Example: deleteBrokenCopySetExample, - }, - } - - basecmd.NewFinalCurveCli(&cmd.FinalCurveCmd, cmd) - return cmd -} - -func (d *DeleteBrokenCopySetCommand) AddFlags() { - config.AddBsMdsFlagOption(d.Cmd) - config.AddRpcRetryTimesFlag(d.Cmd) - config.AddRpcTimeoutFlag(d.Cmd) - config.AddBsChunkServerIdFlag(d.Cmd) -} - -func (d *DeleteBrokenCopySetInChunkServerRpc) NewRpcClient(cc grpc.ClientConnInterface) { - d.topologyClient = topology.NewTopologyServiceClient(cc) -} - -func (d *DeleteBrokenCopySetCommand) Init(cmd *cobra.Command, args []string) error { - mdsAddrs, err := config.GetBsMdsAddrSlice(d.Cmd) - if err.TypeCode() != cmderror.CODE_SUCCESS { - return err.ToError() - } - timeout := config.GetFlagDuration(d.Cmd, config.RPCTIMEOUT) - retrytimes := config.GetFlagInt32(d.Cmd, config.RPCRETRYTIMES) - strid, e := strconv.Atoi(config.GetBsFlagString(d.Cmd, config.CURVEBS_CHUNKSERVER_ID)) - if e != nil { - return e - } - d.chunkserverid = uint32(strid) - d.Rpc = &DeleteBrokenCopySetInChunkServerRpc{ - Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "DeleteBrokenCopysetInChunkServer"), - Request: &topology.DeleteBrokenCopysetInChunkServerRequest{ - ChunkServerID: &d.chunkserverid, - }, - } - return nil -} - -func (d *DeleteBrokenCopySetCommand) RunCommand(cmd *cobra.Command, args []string) error { - result, errCmd := basecmd.GetRpcResponse(d.Rpc.Info, d.Rpc) - if errCmd.TypeCode() != cmderror.CODE_SUCCESS { - return errCmd.ToError() - } - d.Result = result - return nil -} - -func (d *DeleteBrokenCopySetCommand) Print(cmd *cobra.Command, args []string) error { - return output.FinalCmdOutput(&d.FinalCurveCmd, d) -} - -func (d *DeleteBrokenCopySetCommand) ResultPlainOutput() error { - return output.FinalCmdOutputPlain(&d.FinalCurveCmd) -} diff --git a/tools-v2/pkg/cli/command/curvebs/delete/delete.go b/tools-v2/pkg/cli/command/curvebs/delete/delete.go index 83c2b1e57a..fe57b58ef0 100644 --- a/tools-v2/pkg/cli/command/curvebs/delete/delete.go +++ b/tools-v2/pkg/cli/command/curvebs/delete/delete.go @@ -7,7 +7,6 @@ package delete import ( - "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/delete/copyset" "github.com/spf13/cobra" basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" @@ -27,7 +26,6 @@ func (dCmd *DeleteCommand) AddSubCommands() { file.NewFileCommand(), peer.NewCommand(), volume.NewVolumeCommand(), - copyset.NewDeleteCommand(), ) }