diff --git a/WORKSPACE b/WORKSPACE index ddeffa41a8..a423f1c46a 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -37,7 +37,6 @@ git_repository( commit = "d12de388c97998f5ccd5cb97ed0da728815ef438", patches = [ "//:thirdparties/braft/0001-fix-change-set_error-to-set_errorv.patch", - "//:thirdparties/braft/add-iterator-has_error.patch", ], patch_args = [ "-p1" diff --git a/proto/chunk.proto b/proto/chunk.proto index 32cbbb8836..94eec1b1a8 100755 --- a/proto/chunk.proto +++ b/proto/chunk.proto @@ -86,6 +86,7 @@ enum CHUNK_OP_STATUS { CHUNK_OP_STATUS_CHUNK_EXIST = 11; // chunk已存在 CHUNK_OP_STATUS_EPOCH_TOO_OLD = 12; // request epoch too old CHUNK_OP_STATUS_READONLY = 13; // copyset其他节点故障,设为只读 + CHUNK_OP_STATUS_ENOSPC = 14; // 空间不足错误 }; message ChunkResponse { diff --git a/proto/copyset.proto b/proto/copyset.proto index 7cebef68a1..fe3d271d53 100755 --- a/proto/copyset.proto +++ b/proto/copyset.proto @@ -100,6 +100,4 @@ service CopysetService { rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse); rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse); - - rpc DeleteBrokenCopysetNode (CopysetRequest2) returns (CopysetResponse2); }; diff --git a/proto/topology.proto b/proto/topology.proto index 5c6031f6e9..c1ef9d97e5 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -565,14 +565,6 @@ message ListUnAvailCopySetsResponse { repeated common.CopysetInfo copysets = 2; } -message DeleteBrokenCopysetInChunkServerRequest { - required uint32 chunkServerID = 1; -} - -message DeleteBrokenCopysetInChunkServerResponse { - required sint32 statusCode = 1; -} - //TODO(hzsunjianliang): update userPolicy and so on service TopologyService { rpc RegistChunkServer(ChunkServerRegistRequest) returns (ChunkServerRegistResponse); @@ -618,6 +610,4 @@ service TopologyService { rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse); rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse); rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse); - rpc DeleteBrokenCopysetInChunkServer(DeleteBrokenCopysetInChunkServerRequest) returns (DeleteBrokenCopysetInChunkServerResponse); - } diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index 0a01039714..a70958d449 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -293,7 +293,6 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { */ braft::Closure *closure = iter.done(); - std::shared_ptr wrapperPtr = std::make_shared(&iter); if (nullptr != closure) { /** * 1.closure不是null,那么说明当前节点正常,直接从内存中拿到Op @@ -306,7 +305,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { std::shared_ptr& opRequest = chunkClosure->request_; concurrentapply_->Push(opRequest->ChunkId(), ChunkOpRequest::Schedule(opRequest->OpType()), // NOLINT &ChunkOpRequest::OnApply, opRequest, - iter.index(), doneGuard.release(), wrapperPtr); + iter.index(), doneGuard.release()); } else { // 获取log entry butil::IOBuf log = iter.data(); @@ -323,11 +322,9 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { auto chunkId = request.chunkid(); concurrentapply_->Push(chunkId, ChunkOpRequest::Schedule(request.optype()), // NOLINT &ChunkOpRequest::OnApplyFromLog, opReq, - dataStore_, std::move(request), data, wrapperPtr); + dataStore_, std::move(request), data); } } - // 等待写操作完成,否则on_apply结束后,异步有写错误无法调用set_error_and_rollback() - concurrentapply_->Flush(); } void CopysetNode::on_shutdown() { @@ -556,7 +553,7 @@ void CopysetNode::on_leader_stop(const butil::Status &status) { } void CopysetNode::on_error(const ::braft::Error &e) { - LOG(ERROR) << "Copyset: " << GroupIdString() + LOG(FATAL) << "Copyset: " << GroupIdString() << ", peer id: " << peerId_.to_string() << " meet raft error: " << e; } @@ -1126,13 +1123,5 @@ SyncChunkThread::~SyncChunkThread() { Stop(); } -void IteratorWrapper::set_error_and_rollback(size_t ntail, const butil::Status* st) { - iter_->set_error_and_rollback(ntail, st); -} - -bool IteratorWrapper::has_error() const{ - return iter_->has_error(); -} - } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index 2bca4bb648..dbdf6111a2 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -124,18 +124,6 @@ class SyncChunkThread : public curve::common::Uncopyable { CopysetNode* node_; }; -// 用于unitest mock braft::Iterator -class IteratorWrapper { - public: - IteratorWrapper() {} - IteratorWrapper(braft::Iterator *iter): iter_(iter) {} - ~IteratorWrapper() {} - virtual void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL); - virtual bool has_error() const; - private: - braft::Iterator *iter_; -}; - /** * 一个Copyset Node就是一个复制组的副本 */ diff --git a/src/chunkserver/copyset_service.cpp b/src/chunkserver/copyset_service.cpp index b47f7c541f..e09516c0ad 100755 --- a/src/chunkserver/copyset_service.cpp +++ b/src/chunkserver/copyset_service.cpp @@ -232,45 +232,5 @@ void CopysetServiceImpl::GetCopysetStatus(RpcController *controller, request->copysetid()); } -void CopysetServiceImpl::DeleteBrokenCopysetNode(RpcController *controller, - const CopysetRequest2 *request, - CopysetResponse2 *response, - Closure *done) { - (void)controller; - brpc::ClosureGuard doneGuard(done); - - Copyset copyset; - - LOG(INFO) << "Received DeleteBrokenCopysetNode request"; - - for (int i = 0; i < request->copysets_size(); ++i) { - copyset = request->copysets(i); - - // 判断copyset是否存在 - auto nodePtr = copysetNodeManager_->GetCopysetNode(copyset.logicpoolid(), - copyset.copysetid()); - if (nullptr == nodePtr) { - continue; - } - - NodeStatus status; - nodePtr->GetStatus(&status); - // 只删除状态有问题的copyset node - if (status.state != braft::State::STATE_ERROR) { - continue; - } - - copysetNodeManager_->DeleteCopysetNode(copyset.logicpoolid(), copyset.copysetid()); - - LOG(INFO) << "Delete copyset node" - << ToGroupIdString(copyset.logicpoolid(), - copyset.copysetid()) - << " success."; - } - - response->set_status(COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - LOG(INFO) << "DeleteBrokenCopysetNode " << request->copysets().size() << " copysets success"; -} - } // namespace chunkserver } // namespace curve diff --git a/src/chunkserver/copyset_service.h b/src/chunkserver/copyset_service.h index ebb8ee430c..fabf6df8fc 100755 --- a/src/chunkserver/copyset_service.h +++ b/src/chunkserver/copyset_service.h @@ -71,14 +71,6 @@ class CopysetServiceImpl : public CopysetService { CopysetStatusResponse *response, Closure *done); - /** - * 删除状态ERROR的copyset node - */ - void DeleteBrokenCopysetNode(RpcController *controller, - const CopysetRequest2 *request, - CopysetResponse2 *response, - Closure *done); - private: // 复制组管理者 CopysetNodeManager* copysetNodeManager_; diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index a6e6d3e0cc..a7f0a1aef3 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/datastore/chunkserver_chunkfile.h" @@ -400,6 +401,9 @@ CSErrorCode CSChunkFile::Write(SequenceNum sn, << "ChunkID: " << chunkId_ << ",request sn: " << sn << ",chunk sn: " << metaPage_.sn; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } // If it is a clone chunk, the bitmap will be updated @@ -478,6 +482,9 @@ CSErrorCode CSChunkFile::Paste(const char * buf, off_t offset, size_t length) { << "ChunkID: " << chunkId_ << ", offset: " << offset << ", length: " << length; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } } diff --git a/src/chunkserver/datastore/chunkserver_snapshot.cpp b/src/chunkserver/datastore/chunkserver_snapshot.cpp index f1e398e8c6..67ed83102d 100644 --- a/src/chunkserver/datastore/chunkserver_snapshot.cpp +++ b/src/chunkserver/datastore/chunkserver_snapshot.cpp @@ -21,6 +21,7 @@ */ #include +#include #include "src/chunkserver/datastore/chunkserver_datastore.h" #include "src/chunkserver/datastore/chunkserver_snapshot.h" @@ -216,6 +217,9 @@ CSErrorCode CSSnapshot::Write(const char * buf, off_t offset, size_t length) { LOG(ERROR) << "Write snapshot failed." << "ChunkID: " << chunkId_ << ",snapshot sn: " << metaPage_.sn; + if (rc == -ENOSPC) { + return CSErrorCode::NoSpaceError; + } return CSErrorCode::InternalError; } uint32_t pageBeginIndex = offset / blockSize_; diff --git a/src/chunkserver/datastore/define.h b/src/chunkserver/datastore/define.h index 41c25677cf..1d0dda13fc 100644 --- a/src/chunkserver/datastore/define.h +++ b/src/chunkserver/datastore/define.h @@ -73,6 +73,8 @@ enum CSErrorCode { // The page has not been written, it will appear when the page that has not // been written is read when the clone chunk is read PageNerverWrittenError = 13, + // ENOSPC error + NoSpaceError = 14, }; // Chunk details diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 6c9655f49d..1a46b6a2c1 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -201,36 +201,18 @@ uint64_t MaxAppliedIndex( } // namespace void DeleteChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->DeleteChunk(request_->chunkid(), request_->sn()); if (CSErrorCode::Success == ret) { response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); node_->UpdateAppliedIndex(index); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete chunk failed: " + LOG(FATAL) << "delete chunk failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "delete chunk failed: " << " data store return: " << ret @@ -243,18 +225,7 @@ void DeleteChunkRequest::OnApply(uint64_t index, void DeleteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - + const butil::IOBuf &data) { (void)data; // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->DeleteChunk(request.chunkid(), @@ -263,10 +234,9 @@ void DeleteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, return; if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete failed: " + LOG(FATAL) << "delete failed: " << " data store return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); } else { LOG(ERROR) << "delete failed: " << " data store return: " << ret @@ -314,7 +284,7 @@ void ReadChunkRequest::Process() { auto task = std::bind(&ReadChunkRequest::OnApply, thisPtr, node_->GetAppliedIndex(), - doneGuard.release(), nullptr); + doneGuard.release()); concurrentApplyModule_->Push(request_->chunkid(), ChunkOpRequest::Schedule(request_->optype()), // NOLINT task); @@ -334,8 +304,7 @@ void ReadChunkRequest::Process() { } void ReadChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { // 先清除response中的status,以保证CheckForward后的判断的正确性 response_->clear_status(); @@ -400,8 +369,7 @@ void ReadChunkRequest::OnApply(uint64_t index, void ReadChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)datastore; (void)request; (void)data; @@ -465,22 +433,8 @@ void ReadChunkRequest::ReadChunk() { } void WriteChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "write chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - uint32_t cost; std::string cloneSourceLocation; @@ -509,23 +463,23 @@ void WriteChunkRequest::OnApply(uint64_t index, << ", request: " << request_->ShortDebugString(); response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + response_->set_status( + CHUNK_OP_STATUS::CHUNK_OP_STATUS_ENOSPC); } else if (CSErrorCode::InternalError == ret || CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset + * internalerror一般是磁盘错误,为了防止副本不一致,让进程退出 + * TODO(yyk): 当前遇到write错误直接fatal退出整个 + * ChunkServer后期考虑仅仅标坏这个copyset,保证较好的可用性 */ - LOG(ERROR) << "write failed: " + LOG(FATAL) << "write failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "write failed: " << " data store return: " << ret @@ -540,18 +494,7 @@ void WriteChunkRequest::OnApply(uint64_t index, void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "write chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - + const butil::IOBuf &data) { // NOTE: 处理过程中优先使用参数传入的datastore/request uint32_t cost; std::string cloneSourceLocation; @@ -561,56 +504,44 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, request.clonefileoffset()); } - auto ret = datastore->WriteChunk(request.chunkid(), + do { + auto ret = datastore->WriteChunk(request.chunkid(), request.sn(), data, request.offset(), request.size(), &cost, cloneSourceLocation); - if (CSErrorCode::Success == ret) { - return; - } else if (CSErrorCode::BackwardRequestError == ret) { - LOG(WARNING) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } else if (CSErrorCode::InternalError == ret || - CSErrorCode::CrcCheckError == ret || - CSErrorCode::FileFormatError == ret) { - - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "write failed: " - << " data store return: " << ret - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - } else { - LOG(ERROR) << "write failed: " - << " data store return: " << ret - << ", request: " << request.ShortDebugString(); - } + if (CSErrorCode::Success == ret) { + return; + } else if (CSErrorCode::BackwardRequestError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(WARNING) << "write failed: " + << " data store return: " << ret + << ", request: " << request_->ShortDebugString(); + sleep(WAIT_FOR_DISK_FREED); + continue; + } else if (CSErrorCode::InternalError == ret || + CSErrorCode::CrcCheckError == ret || + CSErrorCode::FileFormatError == ret) { + LOG(FATAL) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } else { + LOG(ERROR) << "write failed: " + << " data store return: " << ret + << ", request: " << request.ShortDebugString(); + } + } while (false); + } void ReadSnapshotRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "read snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - char *readBuffer = nullptr; uint32_t size = request_->size(); readBuffer = new(std::nothrow)char[size]; @@ -645,13 +576,9 @@ void ReadSnapshotRequest::OnApply(uint64_t index, * 3.internal error */ if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "read snapshot failed: " + LOG(FATAL) << "read snapshot failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } /** * 4.其他错误 @@ -668,8 +595,7 @@ void ReadSnapshotRequest::OnApply(uint64_t index, void ReadSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)datastore; (void)request; (void)data; @@ -678,22 +604,8 @@ void ReadSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, } void DeleteSnapshotRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - CSErrorCode ret = datastore_->DeleteSnapshotChunkOrCorrectSn( request_->chunkid(), request_->correctedsn()); if (CSErrorCode::Success == ret) { @@ -706,14 +618,9 @@ void DeleteSnapshotRequest::OnApply(uint64_t index, response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete snapshot or correct sn failed: " + LOG(FATAL) << "delete snapshot or correct sn failed: " << " data store return: " << ret << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; } else { LOG(ERROR) << "delete snapshot or correct sn failed: " << " data store return: " << ret @@ -727,18 +634,8 @@ void DeleteSnapshotRequest::OnApply(uint64_t index, void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "delete snapshot failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->DeleteSnapshotChunkOrCorrectSn( request.chunkid(), request.correctedsn()); @@ -749,11 +646,9 @@ void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastor << " data store return: " << ret << ", request: " << request.ShortDebugString(); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "delete snapshot or correct sn failed: " + LOG(FATAL) << "delete snapshot or correct sn failed: " << " data store return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; } else { LOG(ERROR) << "delete snapshot or correct sn failed: " << " data store return: " << ret @@ -762,22 +657,9 @@ void DeleteSnapshotRequest::OnApplyFromLog(std::shared_ptr datastor } void CreateCloneChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "create clone chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->CreateCloneChunk(request_->chunkid(), request_->sn(), request_->correctedsn(), @@ -791,17 +673,13 @@ void CreateCloneChunkRequest::OnApply(uint64_t index, CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "create clone failed: " + * TODO(yyk): 当前遇到createclonechunk错误直接fatal退出整个 + * ChunkServer后期考虑仅仅标坏这个copyset,保证较好的可用性 + */ + LOG(FATAL) << "create clone failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; + CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); } else if (CSErrorCode::ChunkConflictError == ret) { LOG(WARNING) << "create clone chunk exist: " << ", request: " << request_->ShortDebugString(); @@ -819,19 +697,8 @@ void CreateCloneChunkRequest::OnApply(uint64_t index, void CreateCloneChunkRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "create clone chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - // NOTE: 处理过程中优先使用参数传入的datastore/request auto ret = datastore->CreateCloneChunk(request.chunkid(), request.sn(), @@ -850,14 +717,8 @@ void CreateCloneChunkRequest::OnApplyFromLog(std::shared_ptr datast if (CSErrorCode::InternalError == ret || CSErrorCode::CrcCheckError == ret || CSErrorCode::FileFormatError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "create clone failed: " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); + LOG(FATAL) << "create clone failed:" + << ", request: " << request.ShortDebugString(); } else { LOG(ERROR) << "create clone failed: " << ", request: " << request.ShortDebugString(); @@ -889,22 +750,9 @@ void PasteChunkInternalRequest::Process() { } void PasteChunkInternalRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "paste chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; - } - auto ret = datastore_->PasteChunk(request_->chunkid(), data_.to_string().c_str(), //NOLINT request_->offset(), @@ -914,19 +762,12 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); node_->UpdateAppliedIndex(index); } else if (CSErrorCode::InternalError == ret) { - - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ + LOG(FATAL) << "paste chunk failed: " + << ", request: " << request_->ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - response_->set_appliedindex(node_->GetAppliedIndex()); - return; + response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE); } else { LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); @@ -938,56 +779,36 @@ void PasteChunkInternalRequest::OnApply(uint64_t index, void PasteChunkInternalRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "paste chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - // NOTE: 处理过程中优先使用参数传入的datastore/request - auto ret = datastore->PasteChunk(request.chunkid(), - data.to_string().c_str(), - request.offset(), - request.size()); - if (CSErrorCode::Success == ret) - return; + const butil::IOBuf &data) { + do { + // NOTE: 处理过程中优先使用参数传入的datastore/request + auto ret = datastore->PasteChunk(request.chunkid(), + data.to_string().c_str(), + request.offset(), + request.size()); + if (CSErrorCode::Success == ret) + return; - if (CSErrorCode::InternalError == ret) { - /** - * internalerror一般是磁盘错误,为了防止副本不一致, - * set_error_and_rollback回退日志,copyset node执行step down下线,选举新的leader - * 假如空间不足,heartbeat会上传磁盘错误标坏copyset - */ - LOG(ERROR) << "paste chunk failed: " + if (CSErrorCode::InternalError == ret) { + LOG(FATAL) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } else if (CSErrorCode::NoSpaceError == ret) { + LOG(ERROR) << "paste chunk failed: " << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - } else { - LOG(ERROR) << "paste chunk failed: " - << ", request: " << request.ShortDebugString(); - } + sleep(WAIT_FOR_DISK_FREED); + continue; + } else { + LOG(ERROR) << "paste chunk failed: " + << ", request: " << request.ShortDebugString(); + } + } while (false); + } void ScanChunkRequest::OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) { + ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "scan chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); - return; - } - // read and calculate crc, build scanmap uint32_t crc = 0; size_t size = request_->size(); @@ -1029,11 +850,8 @@ void ScanChunkRequest::OnApply(uint64_t index, response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "scan chunk failed, read chunk internal error" + LOG(FATAL) << "scan chunk failed, read chunk internal error" << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED); } else { response_->set_status( CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); @@ -1042,19 +860,8 @@ void ScanChunkRequest::OnApply(uint64_t index, void ScanChunkRequest::OnApplyFromLog(std::shared_ptr datastore, //NOLINT const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) { + const butil::IOBuf &data) { (void)data; - - // 由于写操作ConcurrentApply异步执行,有可能上一个操作已经失败,如磁盘空间不足,需要检查 - if (iterPtr->has_error()) { - LOG(ERROR) << "scan chunk failed: " - << " last log apply fail " - << ", request: " << request_->ShortDebugString(); - iterPtr->set_error_and_rollback(); - return; - } - uint32_t crc = 0; size_t size = request.size(); std::unique_ptr readBuffer(new(std::nothrow)char[size]); @@ -1083,10 +890,9 @@ void ScanChunkRequest::OnApplyFromLog(std::shared_ptr datastore, / << " datastore return: " << ret << ", request: " << request.ShortDebugString(); } else if (CSErrorCode::InternalError == ret) { - LOG(ERROR) << "scan failed: " + LOG(FATAL) << "scan failed: " << " datastore return: " << ret << ", request: " << request.ShortDebugString(); - iterPtr->set_error_and_rollback(); } else { LOG(ERROR) << "scan failed: " << " datastore return: " << ret diff --git a/src/chunkserver/op_request.h b/src/chunkserver/op_request.h index b6d4241368..6aaefce5c5 100755 --- a/src/chunkserver/op_request.h +++ b/src/chunkserver/op_request.h @@ -23,6 +23,8 @@ #ifndef SRC_CHUNKSERVER_OP_REQUEST_H_ #define SRC_CHUNKSERVER_OP_REQUEST_H_ +#define WAIT_FOR_DISK_FREED 60 /* in seconds */ + #include #include #include @@ -80,11 +82,9 @@ class ChunkOpRequest : public std::enable_shared_from_this { * request正常情况从内存中获取上下文on apply逻辑 * @param index:此op log entry的index * @param done:对应的ChunkClosure - * @param iter:用于判断和更新StateMachine */ virtual void OnApply(uint64_t index, - ::google::protobuf::Closure *done, - std::shared_ptr iterPtr) = 0; + ::google::protobuf::Closure *done) = 0; /** * NOTE: 子类实现过程中优先使用参数传入的datastore/request @@ -95,12 +95,10 @@ class ChunkOpRequest : public std::enable_shared_from_this { * @param datastore:chunk数据持久化层 * @param request:反序列化后得到的request 细信息 * @param data:反序列化后得到的request要处理的数据 - * @param iter:用于判断和更新StateMachine */ virtual void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) = 0; + const butil::IOBuf &data) = 0; /** * 返回request的done成员 @@ -204,11 +202,10 @@ class DeleteChunkRequest : public ChunkOpRequest { done) {} virtual ~DeleteChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class ReadChunkRequest : public ChunkOpRequest { @@ -228,10 +225,10 @@ class ReadChunkRequest : public ChunkOpRequest { virtual ~ReadChunkRequest() = default; void Process() override; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; const ChunkRequest* GetChunkRequest() { return request_; @@ -267,11 +264,10 @@ class WriteChunkRequest : public ChunkOpRequest { done) {} virtual ~WriteChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr); + void OnApply(uint64_t index, ::google::protobuf::Closure *done); void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class ReadSnapshotRequest : public ChunkOpRequest { @@ -290,11 +286,10 @@ class ReadSnapshotRequest : public ChunkOpRequest { done) {} virtual ~ReadSnapshotRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class DeleteSnapshotRequest : public ChunkOpRequest { @@ -313,11 +308,10 @@ class DeleteSnapshotRequest : public ChunkOpRequest { done) {} virtual ~DeleteSnapshotRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class CreateCloneChunkRequest : public ChunkOpRequest { @@ -336,11 +330,10 @@ class CreateCloneChunkRequest : public ChunkOpRequest { done) {} virtual ~CreateCloneChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; }; class PasteChunkInternalRequest : public ChunkOpRequest { @@ -364,11 +357,10 @@ class PasteChunkInternalRequest : public ChunkOpRequest { virtual ~PasteChunkInternalRequest() = default; void Process() override; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; private: butil::IOBuf data_; @@ -391,11 +383,10 @@ class ScanChunkRequest : public ChunkOpRequest { scanManager_(scanManager) {} virtual ~ScanChunkRequest() = default; - void OnApply(uint64_t index, ::google::protobuf::Closure *done, std::shared_ptr iterPtr) override; + void OnApply(uint64_t index, ::google::protobuf::Closure *done) override; void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, - const butil::IOBuf &data, - std::shared_ptr iterPtr) override; + const butil::IOBuf &data) override; private: void BuildAndSendScanMap(const ChunkRequest &request, uint64_t index, diff --git a/src/client/chunk_closure.cpp b/src/client/chunk_closure.cpp index ebed92ae88..97269dd8b0 100644 --- a/src/client/chunk_closure.cpp +++ b/src/client/chunk_closure.cpp @@ -101,6 +101,8 @@ void ClientClosure::PreProcessBeforeRetry(int rpcstatus, int cntlstatus) { if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_REDIRECTED) { nextSleepUS /= 10; } + } else if (rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY || rpcstatus == CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE) { + nextSleepUS = failReqOpt_.chunkserverWaitDiskFreeRetryIntervalMS; } LOG(WARNING) @@ -243,6 +245,11 @@ void ClientClosure::Run() { OnReadOnly(); break; + case CHUNK_OP_STATUS::CHUNK_OP_STATUS_ENOSPC: + needRetry = true; + OnNoSpace(); + break; + default: needRetry = true; LOG(WARNING) << OpTypeToString(reqCtx_->optype_) @@ -383,6 +390,18 @@ void ClientClosure::OnReadOnly() { << butil::endpoint2str(cntl_->remote_side()).c_str(); } +void ClientClosure::OnNoSpace() { + reqDone_->SetFailed(status_); + LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " copyset is no space, " + << *reqCtx_ + << ", status = " << status_ + << ", retried times = " << reqDone_->GetRetriedTimes() + << ", IO id = " << reqDone_->GetIOTracker()->GetID() + << ", request id = " << reqCtx_->id_ + << ", remote side = " + << butil::endpoint2str(cntl_->remote_side()).c_str(); +} + void ClientClosure::OnRedirected() { LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " redirected, " << *reqCtx_ diff --git a/src/client/chunk_closure.h b/src/client/chunk_closure.h index c39231f76d..2764d263ee 100644 --- a/src/client/chunk_closure.h +++ b/src/client/chunk_closure.h @@ -116,6 +116,9 @@ class ClientClosure : public Closure { // handle readonly void OnReadOnly(); + // handle nospace + void OnNoSpace(); + // 非法参数 void OnInvalidRequest(); diff --git a/src/client/config_info.h b/src/client/config_info.h index 620d464eae..0e8343d694 100644 --- a/src/client/config_info.h +++ b/src/client/config_info.h @@ -138,6 +138,8 @@ struct ChunkServerUnstableOption { * copyset 标记为unstable,促使其下次发送rpc前,先去getleader。 * @chunkserverMinRetryTimesForceTimeoutBackoff: * 当一个请求重试次数超过阈值时,还在重试 使其超时时间进行指数退避 + * @chunkserverWaitDiskFreeRetryIntervalMS: + * 当请求返回readonly或者nospace错误,hang住io等待一段时间后重试。 */ struct FailureRequestOption { uint32_t chunkserverOPMaxRetry = 3; @@ -146,6 +148,7 @@ struct FailureRequestOption { uint64_t chunkserverMaxRPCTimeoutMS = 64000; uint64_t chunkserverMaxRetrySleepIntervalUS = 64ull * 1000 * 1000; uint64_t chunkserverMinRetryTimesForceTimeoutBackoff = 5; + uint64_t chunkserverWaitDiskFreeRetryIntervalMS = 60 * 1000; // When a request remains outstanding beyond this threshold, it is marked as // a slow request. diff --git a/src/fs/ext4_filesystem_impl.cpp b/src/fs/ext4_filesystem_impl.cpp index f4cd6cfcdb..91bdb327d6 100644 --- a/src/fs/ext4_filesystem_impl.cpp +++ b/src/fs/ext4_filesystem_impl.cpp @@ -316,19 +316,27 @@ int Ext4FileSystemImpl::Write(int fd, buf + relativeOffset, remainLength, offset); - if (ret < 0) { - if (errno == EINTR && retryTimes < MAX_RETYR_TIME) { - ++retryTimes; - continue; - } + + if (ret > 0) { + remainLength -= ret; + offset += ret; + relativeOffset += ret; + } + + if (errno == EINTR && retryTimes < MAX_RETYR_TIME) { + ++retryTimes; + continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { LOG(ERROR) << "pwrite failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); return -errno; } - remainLength -= ret; - offset += ret; - relativeOffset += ret; + } return length; } @@ -349,19 +357,24 @@ int Ext4FileSystemImpl::Write(int fd, while (remainLength > 0) { ssize_t ret = buf.pcut_into_file_descriptor(fd, offset, remainLength); - if (ret < 0) { - if (errno == EINTR || retryTimes < MAX_RETYR_TIME) { - ++retryTimes; - continue; - } + if (ret > 0) { + remainLength -= ret; + offset += ret; + } + + if (errno == EINTR || retryTimes < MAX_RETYR_TIME) { + ++retryTimes; + continue; + } else if (errno == ENOSPC) { + LOG(ERROR) << "Disk is full writing fd: " << fd + << ". Waiting for someone to free space..."; + return -errno; + } else if (errno > 0) { LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed, fd: " << fd - << ", size: " << remainLength << ", offset: " << offset - << ", error: " << strerror(errno); + << ", size: " << remainLength << ", offset: " << offset + << ", error: " << strerror(errno); return -errno; } - - remainLength -= ret; - offset += ret; } return length; diff --git a/src/mds/topology/topology_service.cpp b/src/mds/topology/topology_service.cpp index 988938bbeb..95ad76fcbc 100644 --- a/src/mds/topology/topology_service.cpp +++ b/src/mds/topology/topology_service.cpp @@ -1167,19 +1167,6 @@ void TopologyServiceImpl::ListChunkFormatStatus( topology_->ListChunkFormatStatus(request, response); } -void TopologyServiceImpl::DeleteBrokenCopysetInChunkServer( - google::protobuf::RpcController* cntl_base, - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response, - google::protobuf::Closure* done) { - brpc::ClosureGuard done_guard(done); - brpc::Controller* cntl = static_cast(cntl_base); - LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " - << cntl->remote_side() << " to " << cntl->local_side() - << ". [DeleteBrokenCopysetInChunkServer] " << request->DebugString(); - topology_->DeleteBrokenCopysetInChunkServer(request, response); -} - } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service.h b/src/mds/topology/topology_service.h index 46b6a315e7..dc5a46cfaa 100644 --- a/src/mds/topology/topology_service.h +++ b/src/mds/topology/topology_service.h @@ -236,14 +236,7 @@ class TopologyServiceImpl : public TopologyService { google::protobuf::RpcController* cntl_base, const ListChunkFormatStatusRequest* request, ListChunkFormatStatusResponse* response, - google::protobuf::Closure* done); - - virtual void DeleteBrokenCopysetInChunkServer( - google::protobuf::RpcController* cntl_base, - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response, - google::protobuf::Closure* done - ); + google::protobuf::Closure* done); private: std::shared_ptr topology_; diff --git a/src/mds/topology/topology_service_manager.cpp b/src/mds/topology/topology_service_manager.cpp index a16929f974..ce0b6822a4 100644 --- a/src/mds/topology/topology_service_manager.cpp +++ b/src/mds/topology/topology_service_manager.cpp @@ -1814,99 +1814,6 @@ void TopologyServiceManager::ListChunkFormatStatus( } } -void TopologyServiceManager::DeleteBrokenCopysetInChunkServer( - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response) { - std::map> csMap; - std::vector copysets = - topology_->GetCopySetsInChunkServer(request->chunkserverid()); - for (const CopySetKey& copyset : copysets) { - CopySetInfo csInfo; - if (topology_->GetCopySet(copyset, &csInfo)) { - if (!csInfo.IsAvailable()) { - for (ChunkServerIdType id : csInfo.GetCopySetMembers()) { - ChunkServer chunkserverInfo; - if (true != topology_->GetChunkServer(id, &chunkserverInfo)) { - continue; - } - std::string address = chunkserverInfo.GetHostIp() + ":" + std::to_string(chunkserverInfo.GetPort()); - if (csMap.find(address) != csMap.end()) { - csMap[address].push_back(csInfo); - } else { - std::vector temp; - temp.push_back(csInfo); - csMap[address] = temp; - } - } - } - } - } - - // 删除braft状态error的copyset node - for(const auto &iter : csMap) { - brpc::Controller cntl; - cntl.set_timeout_ms(500); - brpc::Channel channel; - if (channel.Init(iter.first.c_str(), nullptr) !=0) { - LOG(WARNING) << "can not create channel to " - << iter.first.c_str(); - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - - CopysetService_Stub stub(&channel); - - CopysetRequest2 copysetRequest; - CopysetResponse2 copysetResponse; - for (auto &cs : iter.second) { - ::curve::chunkserver::Copyset *copyset = copysetRequest.add_copysets(); - copyset->set_logicpoolid(cs.GetLogicalPoolId()); - copyset->set_copysetid(cs.GetId()); - } - - stub.DeleteBrokenCopysetNode(&cntl, - ©setRequest, - ©setResponse, - nullptr); - LOG(INFO) << "Send DeleteBrokenCopysetNode[log_id=" << cntl.log_id() - << "] from " << cntl.local_side() - << " to " << cntl.remote_side() - << ". [CopysetRequest] : " - << " copysetRequest.copysets_size() = " - << copysetRequest.copysets_size(); - - if (cntl.Failed()) { - LOG(ERROR) << "Send DeleteBrokenCopysetNode failed, " - << "cntl.errorText = " - << cntl.ErrorText() << std::endl; - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - } - - for (const CopySetKey& copyset : copysets) { - CopySetInfo csInfo; - if (topology_->GetCopySet(copyset, &csInfo)) { - if (!csInfo.IsAvailable()) { - // 删除copyset node peer,更新topo,等待replicaScheduler重新创建peer - std::set peers = csInfo.GetCopySetMembers(); - peers.erase(request->chunkserverid()); - topology_->UpdateCopySetTopo(csInfo); - // 设置copyset node availflag true - int res = topology_->SetCopySetAvalFlag(copyset, true); - if (res != kTopoErrCodeSuccess) { - LOG(ERROR) << "Topology set copyset aval flag fail"; - response->set_statuscode(kTopoErrCodeInternalError); - return; - } - } - } - } - response->set_statuscode(kTopoErrCodeSuccess); -} - - - } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service_manager.h b/src/mds/topology/topology_service_manager.h index 799f9368db..bcbfc98f20 100644 --- a/src/mds/topology/topology_service_manager.h +++ b/src/mds/topology/topology_service_manager.h @@ -202,10 +202,6 @@ class TopologyServiceManager { const ListChunkFormatStatusRequest* request, ListChunkFormatStatusResponse* response); - virtual void DeleteBrokenCopysetInChunkServer( - const DeleteBrokenCopysetInChunkServerRequest* request, - DeleteBrokenCopysetInChunkServerResponse* response); - private: /** * @brief create copyset for logical pool diff --git a/test/chunkserver/clone/op_request_test.cpp b/test/chunkserver/clone/op_request_test.cpp index e1ece23380..6746594097 100644 --- a/test/chunkserver/clone/op_request_test.cpp +++ b/test/chunkserver/clone/op_request_test.cpp @@ -63,11 +63,9 @@ class OpRequestTest datastore_ = std::make_shared(); cloneMgr_ = std::make_shared(); concurrentApplyModule_ = std::make_shared(); - iter_ = std::make_shared(); ASSERT_TRUE(concurrentApplyModule_->Init(10, 10)); FakeCopysetNode(); FakeCloneManager(); - FakeIteratorWrapper(); } void TearDown() { } @@ -93,13 +91,6 @@ class OpRequestTest .WillRepeatedly(Return(true)); } - void FakeIteratorWrapper() { - EXPECT_CALL(*iter_, set_error_and_rollback(_,_)) - .WillRepeatedly(Return()); - EXPECT_CALL(*iter_, has_error()) - .WillRepeatedly(Return(false)); - } - protected: ChunkSizeType chunksize_; ChunkSizeType blocksize_; @@ -109,7 +100,6 @@ class OpRequestTest std::shared_ptr datastore_; std::shared_ptr cloneMgr_; std::shared_ptr concurrentApplyModule_; - std::shared_ptr iter_; }; TEST_P(OpRequestTest, CreateCloneTest) { @@ -225,7 +215,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { EXPECT_CALL(*datastore_, CreateCloneChunk(_, _, _, _, _)) .WillOnce(Return(CSErrorCode::Success)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // check result ASSERT_TRUE(closure->isDone_); @@ -237,7 +227,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { /** * test OnApply * case:CreateCloneChunk failed - * expect:不返回成功 + * expect:process exit */ { // reset closure @@ -246,9 +236,8 @@ TEST_P(OpRequestTest, CreateCloneTest) { // check result EXPECT_CALL(*datastore_, CreateCloneChunk(_, _, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * test OnApply @@ -265,7 +254,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { EXPECT_CALL(*node_, UpdateAppliedIndex(_)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // check result ASSERT_TRUE(closure->isDone_); @@ -288,12 +277,12 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillOnce(Return(CSErrorCode::Success)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回InternalError - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -304,12 +293,12 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillRepeatedly(Return(CSErrorCode::InternalError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + ASSERT_DEATH(opReq->OnApplyFromLog(datastore_, *request, data), ""); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回其他错误 - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -320,7 +309,7 @@ TEST_P(OpRequestTest, CreateCloneTest) { .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -450,7 +439,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillOnce(Return(CSErrorCode::Success)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -462,7 +451,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { /** * 测试OnApply * 用例:CreateCloneChunk失败,返回InternalError - * 预期:不返回成功 + * 预期:进程退出 */ { // 重置closure @@ -472,9 +461,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * 测试OnApply @@ -489,7 +476,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { EXPECT_CALL(*datastore_, PasteChunk(_, _, _, _)) .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -512,12 +499,12 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillOnce(Return(CSErrorCode::Success)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回InternalError - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -528,12 +515,12 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillRepeatedly(Return(CSErrorCode::InternalError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + ASSERT_DEATH(opReq->OnApplyFromLog(datastore_, *request, data), ""); } /** * 测试 OnApplyFromLog * 用例:CreateCloneChunk失败,返回其他错误 - * 预期:无返回 + * 预期:进程退出 */ { // 重置closure @@ -544,7 +531,7 @@ TEST_P(OpRequestTest, PasteChunkTest) { .WillRepeatedly(Return(CSErrorCode::InvalidArgError)); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -808,7 +795,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { .WillOnce(DoAll(SetArrayArgument<2>(chunkData, chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -845,7 +832,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { .WillOnce(DoAll(SetArrayArgument<2>(chunkData, chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -885,7 +872,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -910,7 +897,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { // 不会读chunk文件 EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -941,7 +928,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -977,7 +964,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { chunkData + length), Return(CSErrorCode::Success))); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1006,7 +993,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1017,7 +1004,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { /** * 测试OnApply * 用例:读本地chunk的时候返回错误 - * 预期:不返回成功 + * 预期:请求失败,返回 CHUNK_OP_STATUS_FAILURE_UNKNOWN */ { // 重置closure @@ -1032,9 +1019,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .WillRepeatedly(Return(CSErrorCode::InternalError)); - opReq->OnApply(3, closure, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - closure->response_->status()); + ASSERT_DEATH(opReq->OnApply(3, closure), ""); } /** * 测试OnApply @@ -1060,7 +1045,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(false)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1077,7 +1062,7 @@ TEST_P(OpRequestTest, ReadChunkTest) { closure->Reset(); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); @@ -1225,7 +1210,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1253,7 +1238,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1285,7 +1270,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(true)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_FALSE(closure->isDone_); @@ -1311,7 +1296,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1335,7 +1320,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*datastore_, ReadChunk(_, _, _, offset, length)) .Times(0); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1367,7 +1352,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { EXPECT_CALL(*cloneMgr_, IssueCloneTask(_)) .WillOnce(Return(false)); - opReq->OnApply(3, closure, iter_); + opReq->OnApply(3, closure); // 验证结果 ASSERT_TRUE(closure->isDone_); @@ -1384,7 +1369,7 @@ TEST_P(OpRequestTest, RecoverChunkTest) { closure->Reset(); butil::IOBuf data; - opReq->OnApplyFromLog(datastore_, *request, data, iter_); + opReq->OnApplyFromLog(datastore_, *request, data); } // 释放资源 closure->Release(); diff --git a/test/chunkserver/copyset_node_test.cpp b/test/chunkserver/copyset_node_test.cpp index b894862882..46ed6a4fdb 100644 --- a/test/chunkserver/copyset_node_test.cpp +++ b/test/chunkserver/copyset_node_test.cpp @@ -492,14 +492,14 @@ TEST_F(CopysetNodeTest, error_test) { LOG(INFO) << "OK"; } /* on_error */ - // { - // LogicPoolID logicPoolID = 123; - // CopysetID copysetID = 1345; - // Configuration conf; - // CopysetNode copysetNode(logicPoolID, copysetID, conf); - // braft::Error error; - // ASSERT_DEATH(copysetNode.on_error(error), ".*raft error.*"); - // } + { + LogicPoolID logicPoolID = 123; + CopysetID copysetID = 1345; + Configuration conf; + CopysetNode copysetNode(logicPoolID, copysetID, conf); + braft::Error error; + ASSERT_DEATH(copysetNode.on_error(error), ".*raft error.*"); + } /* Fini, raftNode is null */ { LogicPoolID logicPoolID = 123; diff --git a/test/chunkserver/copyset_service_test.cpp b/test/chunkserver/copyset_service_test.cpp index 18c7f31b1a..973529366b 100644 --- a/test/chunkserver/copyset_service_test.cpp +++ b/test/chunkserver/copyset_service_test.cpp @@ -223,50 +223,6 @@ TEST_F(CopysetServiceTest, basic) { ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS); } - /* 测试创建一个新的 copyset */ - { - brpc::Controller cntl; - cntl.set_timeout_ms(3000); - - CopysetRequest request; - CopysetResponse response; - request.set_logicpoolid(logicPoolId); - request.set_copysetid(copysetId); - request.add_peerid("127.0.0.1:9040:0"); - request.add_peerid("127.0.0.1:9041:0"); - request.add_peerid("127.0.0.1:9042:0"); - stub.CreateCopysetNode(&cntl, &request, &response, nullptr); - if (cntl.Failed()) { - std::cout << cntl.ErrorText() << std::endl; - } - ASSERT_EQ(response.status(), - COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - } - // TEST CASES: DeleteBrokenCopysetNode - { - brpc::Controller cntl; - CopysetStatusRequest statusReq; - CopysetStatusResponse statusResp; - CopysetRequest2 request; - CopysetResponse2 response; - Copyset *copyset; - copyset = request.add_copysets(); - copyset->set_logicpoolid(logicPoolId); - copyset->set_copysetid(copysetId); - Peer *peer1 = copyset->add_peers(); - peer1->set_address("127.0.0.1:9040:0"); - Peer *peer2 = copyset->add_peers(); - peer2->set_address("127.0.0.1:9041:0"); - Peer *peer3 = copyset->add_peers(); - peer3->set_address("127.0.0.1:9042:0"); - stub.DeleteBrokenCopysetNode(&cntl, &request, &response, nullptr); - if (cntl.Failed()) { - std::cout << cntl.ErrorText() << std::endl; - } - ASSERT_TRUE(copysetNodeManager->DeleteCopysetNode(logicPoolId, copysetId)); - ASSERT_EQ(response.status(), COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - } - ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); } diff --git a/test/chunkserver/mock_copyset_node.h b/test/chunkserver/mock_copyset_node.h index 0886765387..30676eb4da 100644 --- a/test/chunkserver/mock_copyset_node.h +++ b/test/chunkserver/mock_copyset_node.h @@ -76,14 +76,6 @@ class MockCopysetNode : public CopysetNode { MOCK_METHOD1(on_start_following, void(const ::braft::LeaderChangeContext&)); }; -class MockIteratorWrapper : public IteratorWrapper { - public: - MockIteratorWrapper() = default; - ~MockIteratorWrapper() = default; - MOCK_METHOD2(set_error_and_rollback, void(size_t, const butil::Status*)); - MOCK_CONST_METHOD0(has_error, bool()); -}; - } // namespace chunkserver } // namespace curve diff --git a/test/chunkserver/op_request_test.cpp b/test/chunkserver/op_request_test.cpp index cf3ad8c65f..20a4181444 100644 --- a/test/chunkserver/op_request_test.cpp +++ b/test/chunkserver/op_request_test.cpp @@ -21,7 +21,6 @@ */ #include -#include #include #include #include @@ -34,14 +33,10 @@ #include "src/chunkserver/copyset_node_manager.h" #include "src/chunkserver/op_request.h" #include "test/chunkserver/fake_datastore.h" -#include "test/chunkserver/mock_copyset_node.h" namespace curve { namespace chunkserver { -using ::testing::Return; -using ::testing::_; - using ::google::protobuf::io::ZeroCopyOutputStream; class OpFakeClosure : public Closure { @@ -50,27 +45,7 @@ class OpFakeClosure : public Closure { ~OpFakeClosure() {} }; -class ChunkOpRequestTest: public testing::Test -{ -protected: - virtual void SetUp() - { - iter_ = std::make_shared(); - FakeIterator(); - } - virtual void TearDown() - { - - } - void FakeIterator() { - EXPECT_CALL(*iter_, set_error_and_rollback(_,_)) - .WillRepeatedly(Return()); - } -protected: - std::shared_ptr iter_; -}; - -TEST_F(ChunkOpRequestTest, encode) { +TEST(ChunkOpRequestTest, encode) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t chunkId = 12345; @@ -393,7 +368,7 @@ TEST_F(ChunkOpRequestTest, encode) { } } -TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { +TEST(ChunkOpRequestTest, OnApplyErrorTest) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t chunkId = 12345; @@ -440,7 +415,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -469,7 +444,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -496,9 +471,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -521,7 +494,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -546,9 +519,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -569,9 +540,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -592,7 +561,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::BackwardRequestError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD, @@ -619,9 +588,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; } @@ -644,7 +611,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::BackwardRequestError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_BACKWARD, @@ -671,7 +638,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::InvalidArgError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -700,9 +667,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); - ASSERT_NE(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS, - response.status()); + ASSERT_DEATH(opReq->OnApply(appliedIndex, &done), ""); delete opReq; delete cntl; delete scanManager; @@ -728,7 +693,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::FileFormatError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN, @@ -758,7 +723,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { nullptr); dataStore->InjectError(CSErrorCode::ChunkNotExistError); OpFakeClosure done; - opReq->OnApply(appliedIndex, &done, iter_); + opReq->OnApply(appliedIndex, &done); ASSERT_FALSE(cntl->Failed()); ASSERT_EQ(0, cntl->ErrorCode()); ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_NOTEXIST, @@ -769,7 +734,7 @@ TEST_F(ChunkOpRequestTest, OnApplyErrorTest) { } } -TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { +TEST(ChunkOpRequestTest, OnApplyFromLogTest) { LogicPoolID logicPoolId = 1; CopysetID copysetId = 10001; uint64_t sn = 1; @@ -802,7 +767,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_READ); butil::IOBuf data; ReadChunkRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // read snapshot @@ -819,7 +784,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_READ_SNAP); butil::IOBuf data; ReadSnapshotRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // delete @@ -834,7 +799,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_DELETE); butil::IOBuf data; DeleteChunkRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // delete snapshot @@ -849,7 +814,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_optype(CHUNK_OP_TYPE::CHUNK_OP_DELETE_SNAP); butil::IOBuf data; DeleteSnapshotRequest req; - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } // scan @@ -864,7 +829,7 @@ TEST_F(ChunkOpRequestTest, OnApplyFromLogTest) { request.set_clonefileoffset(followScanRpcTimeoutMs); butil::IOBuf data; ScanChunkRequest req(1, PeerId("127.0.0.1:9010:0")); - req.OnApplyFromLog(dataStore, request, data, iter_); + req.OnApplyFromLog(dataStore, request, data); ASSERT_FALSE(dataStore->HasInjectError()); } } diff --git a/test/mds/topology/mock_topology.h b/test/mds/topology/mock_topology.h index dfff9c3d92..90afab62e1 100644 --- a/test/mds/topology/mock_topology.h +++ b/test/mds/topology/mock_topology.h @@ -322,10 +322,6 @@ class MockTopologyServiceManager : public TopologyServiceManager { MOCK_METHOD2(ListUnAvailCopySets, void(const ListUnAvailCopySetsRequest*, ListUnAvailCopySetsResponse*)); - - MOCK_METHOD2(DeleteBrokenCopysetInChunkServer, - void(const DeleteBrokenCopysetInChunkServerRequest*, - DeleteBrokenCopysetInChunkServerResponse*)); }; class MockTopologyServiceImpl : public TopologyService { @@ -368,11 +364,6 @@ class MockCopysetServiceImpl : public CopysetService { const ::curve::chunkserver::CopysetRequest2 *request, ::curve::chunkserver::CopysetResponse2 *response, google::protobuf::Closure *done)); - MOCK_METHOD4(DeleteBrokenCopysetNode, - void(::google::protobuf::RpcController *controller, - const ::curve::chunkserver::CopysetRequest2 *request, - ::curve::chunkserver::CopysetResponse2 *response, - google::protobuf::Closure *done)); }; } // namespace chunkserver diff --git a/test/mds/topology/test_topology_service_manager.cpp b/test/mds/topology/test_topology_service_manager.cpp index eab5570e9e..efc96de534 100644 --- a/test/mds/topology/test_topology_service_manager.cpp +++ b/test/mds/topology/test_topology_service_manager.cpp @@ -230,13 +230,12 @@ class TestTopologyServiceManager : public ::testing::Test { const std::set &members, bool scaning = false, LastScanSecType lastScanSec = 0, - bool lastScanConsistent = true, bool avail = true) { + bool lastScanConsistent = true) { CopySetInfo copysetInfo(logicalPoolId, copysetId); copysetInfo.SetCopySetMembers(members); copysetInfo.SetScaning(scaning); copysetInfo.SetLastScanSec(lastScanSec); copysetInfo.SetLastScanConsistent(lastScanConsistent); - copysetInfo.SetAvailableFlag(avail); EXPECT_CALL(*storage_, StorageCopySet(_)) .WillOnce(Return(true)); @@ -3674,55 +3673,6 @@ TEST_F(TestTopologyServiceManager, test_ListUnAvailCopySets) { } } -TEST_F(TestTopologyServiceManager, - test_DeleteBrokenCopysetInChunkServer_success) { - CopySetIdType copysetId = 0x51; - PrepareAddPoolset(); - PoolIdType logicalPoolId = 0x01; - PoolIdType physicalPoolId = 0x11; - PrepareAddPhysicalPool(physicalPoolId, "pPool1"); - PrepareAddZone(0x21, "zone1", physicalPoolId); - PrepareAddZone(0x22, "zone2", physicalPoolId); - PrepareAddZone(0x23, "zone3", physicalPoolId); - PrepareAddServer(0x31, "server1", "127.0.0.1", "127.0.0.1", 0x21, 0x11); - PrepareAddServer(0x32, "server2", "127.0.0.1", "127.0.0.1", 0x22, 0x11); - PrepareAddServer(0x33, "server3", "127.0.0.1", "127.0.0.1", 0x23, 0x11); - uint32_t port = listenAddr_.port; - PrepareAddChunkServer( - 0x41, "token1", "nvme", 0x31, "127.0.0.1", "127.0.0.1", port); - PrepareAddChunkServer( - 0x42, "token2", "nvme", 0x32, "127.0.0.1", "127.0.0.1", port); - PrepareAddChunkServer( - 0x43, "token3", "nvme", 0x33, "127.0.0.1", "127.0.0.1", port); - PrepareAddLogicalPool(logicalPoolId, "logicalPool1", physicalPoolId); - std::set replicas; - replicas.insert(0x41); - replicas.insert(0x42); - replicas.insert(0x43); - PrepareAddCopySet(copysetId, logicalPoolId, replicas, false, 0, true, false); - - CopysetResponse2 chunkserverResponse; - chunkserverResponse.set_status( - COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS); - EXPECT_CALL(*mockCopySetService, DeleteBrokenCopysetNode(_, _, _, _)) - .WillRepeatedly(DoAll(SetArgPointee<2>(chunkserverResponse), - Invoke(CreateCopysetNodeFunc))); - EXPECT_CALL(*storage_, UpdateCopySet(_)) - .WillRepeatedly(Return(true)); - DeleteBrokenCopysetInChunkServerRequest request; - request.set_chunkserverid(0x41); - DeleteBrokenCopysetInChunkServerResponse response; - serviceManager_->DeleteBrokenCopysetInChunkServer(&request, &response); - - ASSERT_EQ(kTopoErrCodeSuccess, response.statuscode()); - - ListUnAvailCopySetsRequest request2; - ListUnAvailCopySetsResponse response2; - serviceManager_->ListUnAvailCopySets(&request2, &response2); - ASSERT_EQ(kTopoErrCodeSuccess, response2.statuscode()); - ASSERT_EQ(0, response2.copysets_size()); -} - } // namespace topology } // namespace mds } // namespace curve diff --git a/thirdparties/braft/add-iterator-has_error.patch b/thirdparties/braft/add-iterator-has_error.patch deleted file mode 100644 index 2b4cffd0f6..0000000000 --- a/thirdparties/braft/add-iterator-has_error.patch +++ /dev/null @@ -1,28 +0,0 @@ -diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp -index 6069f70..11adf9a 100644 ---- a/src/braft/raft.cpp -+++ b/src/braft/raft.cpp -@@ -253,6 +253,10 @@ bool Iterator::valid() const { - return _impl->is_good() && _impl->entry()->type == ENTRY_TYPE_DATA; - } - -+bool Iterator::has_error() const { -+ return _impl->has_error(); -+} -+ - int64_t Iterator::index() const { return _impl->index(); } - - int64_t Iterator::term() const { return _impl->entry()->id.term; } -diff --git a/src/braft/raft.h b/src/braft/raft.h -index e4a0f19..83cd353 100644 ---- a/src/braft/raft.h -+++ b/src/braft/raft.h -@@ -178,6 +178,8 @@ public: - // If |st| is not NULL, it should describe the detail of the error. - void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL); - -+ bool has_error() const; -+ - private: - friend class FSMCaller; - Iterator(IteratorImpl* impl) : _impl(impl) {} \ No newline at end of file diff --git a/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go b/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go deleted file mode 100644 index 70c06cc2b6..0000000000 --- a/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go +++ /dev/null @@ -1,102 +0,0 @@ -package copyset - -import ( - "context" - cmderror "github.com/opencurve/curve/tools-v2/internal/error" - basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" - "github.com/opencurve/curve/tools-v2/pkg/config" - "github.com/opencurve/curve/tools-v2/pkg/output" - "github.com/opencurve/curve/tools-v2/proto/proto/topology" - "github.com/spf13/cobra" - "google.golang.org/grpc" - "strconv" -) - -const ( - deleteBrokenCopySetExample = `$ curve bs delete broken-copyset --chunkserverid=1` -) - -type DeleteBrokenCopySetInChunkServerRpc struct { - Info *basecmd.Rpc - Request *topology.DeleteBrokenCopysetInChunkServerRequest - topologyClient topology.TopologyServiceClient -} - -var _ basecmd.RpcFunc = (*DeleteBrokenCopySetInChunkServerRpc)(nil) // check interface - -type DeleteBrokenCopySetCommand struct { - basecmd.FinalCurveCmd - Rpc *DeleteBrokenCopySetInChunkServerRpc - Servers []*topology.ServerInfo - chunkserverid uint32 -} - -func (d *DeleteBrokenCopySetInChunkServerRpc) Stub_Func(ctx context.Context) (interface{}, error) { - return d.topologyClient.DeleteBrokenCopysetInChunkServer(ctx, d.Request) -} - -func NewDeleteCommand() *cobra.Command { - return NewDeleteBrokenCopySetCommand().Cmd -} - -func NewDeleteBrokenCopySetCommand() *DeleteBrokenCopySetCommand { - cmd := &DeleteBrokenCopySetCommand{ - FinalCurveCmd: basecmd.FinalCurveCmd{ - Use: "broken-copyset", - Short: "delete broken copyset in chunkserver", - Example: deleteBrokenCopySetExample, - }, - } - - basecmd.NewFinalCurveCli(&cmd.FinalCurveCmd, cmd) - return cmd -} - -func (d *DeleteBrokenCopySetCommand) AddFlags() { - config.AddBsMdsFlagOption(d.Cmd) - config.AddRpcRetryTimesFlag(d.Cmd) - config.AddRpcTimeoutFlag(d.Cmd) - config.AddBsChunkServerIdFlag(d.Cmd) -} - -func (d *DeleteBrokenCopySetInChunkServerRpc) NewRpcClient(cc grpc.ClientConnInterface) { - d.topologyClient = topology.NewTopologyServiceClient(cc) -} - -func (d *DeleteBrokenCopySetCommand) Init(cmd *cobra.Command, args []string) error { - mdsAddrs, err := config.GetBsMdsAddrSlice(d.Cmd) - if err.TypeCode() != cmderror.CODE_SUCCESS { - return err.ToError() - } - timeout := config.GetFlagDuration(d.Cmd, config.RPCTIMEOUT) - retrytimes := config.GetFlagInt32(d.Cmd, config.RPCRETRYTIMES) - strid, e := strconv.Atoi(config.GetBsFlagString(d.Cmd, config.CURVEBS_CHUNKSERVER_ID)) - if e != nil { - return e - } - d.chunkserverid = uint32(strid) - d.Rpc = &DeleteBrokenCopySetInChunkServerRpc{ - Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "DeleteBrokenCopysetInChunkServer"), - Request: &topology.DeleteBrokenCopysetInChunkServerRequest{ - ChunkServerID: &d.chunkserverid, - }, - } - return nil -} - -func (d *DeleteBrokenCopySetCommand) RunCommand(cmd *cobra.Command, args []string) error { - result, errCmd := basecmd.GetRpcResponse(d.Rpc.Info, d.Rpc) - if errCmd.TypeCode() != cmderror.CODE_SUCCESS { - return errCmd.ToError() - } - d.Result = result - return nil -} - -func (d *DeleteBrokenCopySetCommand) Print(cmd *cobra.Command, args []string) error { - return output.FinalCmdOutput(&d.FinalCurveCmd, d) -} - -func (d *DeleteBrokenCopySetCommand) ResultPlainOutput() error { - return output.FinalCmdOutputPlain(&d.FinalCurveCmd) -} diff --git a/tools-v2/pkg/cli/command/curvebs/delete/delete.go b/tools-v2/pkg/cli/command/curvebs/delete/delete.go index 83c2b1e57a..fe57b58ef0 100644 --- a/tools-v2/pkg/cli/command/curvebs/delete/delete.go +++ b/tools-v2/pkg/cli/command/curvebs/delete/delete.go @@ -7,7 +7,6 @@ package delete import ( - "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/delete/copyset" "github.com/spf13/cobra" basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" @@ -27,7 +26,6 @@ func (dCmd *DeleteCommand) AddSubCommands() { file.NewFileCommand(), peer.NewCommand(), volume.NewVolumeCommand(), - copyset.NewDeleteCommand(), ) }