From 1f3dbd6c80184589cf79d6955b0aebfe01289e1d Mon Sep 17 00:00:00 2001 From: YLShi Date: Mon, 14 Nov 2022 18:54:26 +0800 Subject: [PATCH] add qos to fuse --- curvefs/src/client/fuse_s3_client.cpp | 462 +++++++++++++++++++--- curvefs/src/client/fuse_s3_client.h | 75 +++- curvefs/src/client/fuse_volume_client.cpp | 416 +++++++++---------- curvefs/src/client/fuse_volume_client.h | 68 ++-- 4 files changed, 711 insertions(+), 310 deletions(-) diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index bad7f5f549..a86afd1d40 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -23,26 +23,324 @@ #include "curvefs/src/client/fuse_s3_client.h" #include -#include +#include +namespace curvefs { +namespace client { +namespace common { +DECLARE_bool(enableCto); +} // namespace common +} // namespace client +} // namespace curvefs namespace curvefs { namespace client { CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { - CURVEFS_ERROR ret = FuseClient::Init(option); + FuseClientOption opt(option); + CURVEFS_ERROR ret = FuseClient::Init(opt); if (ret != CURVEFS_ERROR::OK) { return ret; } - s3Client_ = std::make_shared(); - s3Client_->Init(option.s3Opt.s3AdaptrOpt); - ret = s3Adaptor_->Init(option.s3Opt.s3ClientAdaptorOpt, s3Client_.get(), - inodeManager_, mdsClient_); + + // set fsS3Option + const auto& s3Info = fsInfo_->detail().s3info(); + ::curve::common::S3InfoOption fsS3Option; + ::curvefs::client::common::S3Info2FsS3Option(s3Info, &fsS3Option); + SetFuseClientS3Option(&opt, fsS3Option); + + auto s3Client = std::make_shared(); + s3Client->Init(opt.s3Opt.s3AdaptrOpt); + auto fsCacheManager = std::make_shared( + dynamic_cast(s3Adaptor_.get()), + opt.s3Opt.s3ClientAdaptorOpt.readCacheMaxByte, + opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte); + if (opt.s3Opt.s3ClientAdaptorOpt.diskCacheOpt.diskCacheType != + DiskCacheType::Disable) { + auto s3DiskCacheClient = std::make_shared(); + s3DiskCacheClient->Init(opt.s3Opt.s3AdaptrOpt); + auto wrapper = std::make_shared(); + auto diskCacheRead = std::make_shared(); + auto diskCacheWrite = std::make_shared(); + auto diskCacheManager = std::make_shared( + wrapper, diskCacheWrite, diskCacheRead); + auto diskCacheManagerImpl = std::make_shared( + diskCacheManager, s3DiskCacheClient); + ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client, + inodeManager_, mdsClient_, fsCacheManager, + diskCacheManagerImpl, true); + } else { + ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client, + inodeManager_, mdsClient_, fsCacheManager, + nullptr, true); + } + + bgFetchStop_.store(false, std::memory_order_release); + bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this); + GetTaskFetchPool(); + InitQosParam(); return ret; } +void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile, + std::vector& warmUpFilelist) { + struct fuse_file_info fi{}; + fi.flags &= ~O_DIRECT; + size_t rSize = 0; + std::unique_ptr data(new char[warmUpFile.fileLen+1]); + std::memset(data.get(), 0, warmUpFile.fileLen); + data[warmUpFile.fileLen] = '\n'; + FuseOpRead(nullptr, warmUpFile.inode, + warmUpFile.fileLen, 0, &fi, data.get(), &rSize); + std::string file = data.get(); + VLOG(9) << "file is: " << file; + // remove enter, newline, blank + std::string blanks("\r\n "); + file.erase(0, file.find_first_not_of(blanks)); + file.erase(file.find_last_not_of(blanks) + 1); + VLOG(9) << "after del file is: " << file; + splitStr(file, "\n", &warmUpFilelist); +} + +void FuseS3Client::BackGroundFetch() { + while (!bgFetchStop_.load(std::memory_order_acquire)) { + LOG_EVERY_N(WARNING, 100) + << "fetch thread start."; + if (hasWarmUpTask()) { // new warmup task + WarmUpFileContext_t warmUpFile; + GetWarmUpFile(&warmUpFile); + VLOG(9) << " len is: " << warmUpFile.fileLen + << "ino is: " << warmUpFile.inode; + + std::vector warmUpFilelist; + GetWarmUpFileList(warmUpFile, warmUpFilelist); + for (auto filePath : warmUpFilelist) { + FetchDentryEnqueue(filePath); + } + } + { // file need warmup + std::list readAheadFiles; + readAheadFiles.swap(GetReadAheadFiles()); + for (auto iter : readAheadFiles) { + VLOG(9) << "BackGroundFetch: " << iter; + fetchDataEnqueue(iter); + } + } + LOG_EVERY_N(WARNING, 100) + << "fetch thread end."; + usleep(WARMUP_CHECKINTERVAL_US); + } + return; +} + +void FuseS3Client::InitQosParam() { + ReadWriteThrottleParams params; + params.iopsWrite = ThrottleParams(FLAGS_avgFlushIops, 0, 0); + params.bpsWrite = ThrottleParams(FLAGS_avgFlushBytes, FLAGS_burstFlushBytes, + FLAGS_burstSecs); + params.iopsRead = ThrottleParams(FLAGS_avgReadFileIops, 0, 0); + params.bpsRead = ThrottleParams(FLAGS_avgReadFileBytes, 0, 0); + + fuseS3Throttle_.UpdateThrottleParams(params); +} + +void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) { + VLOG(9) << "fetchDataEnqueue start: " << ino; + auto task = [this, ino]() { + std::shared_ptr inodeWrapper; + CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "inodeManager get inode fail, ret = " << ret + << ", inodeid = " << ino; + return; + } + google::protobuf::Map *s3ChunkInfoMap + = nullptr; + { + ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + s3ChunkInfoMap = inodeWrapper->GetChunkInfoMap(); + } + if (nullptr == s3ChunkInfoMap || + s3ChunkInfoMap->empty()) { + return; + } + travelChunks(ino, s3ChunkInfoMap); + }; + GetTaskFetchPool().Enqueue(task); +} + +// travel and download all objs belong to the chunk +void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, + std::list>* prefetchObjs) { + uint64_t blockSize = s3Adaptor_->GetBlockSize(); + uint64_t chunkSize = s3Adaptor_->GetChunkSize(); + uint64_t offset, len, chunkid, compaction; + for (size_t i = 0; i < chunkInfo.s3chunks_size(); i++) { + auto chunkinfo = chunkInfo.mutable_s3chunks(i); + auto fsId = fsInfo_->fsid(); + chunkid = chunkinfo->chunkid(); + compaction = chunkinfo->compaction(); + offset = chunkinfo->offset(); + len = chunkinfo->len(); + // the offset in the chunk + uint64_t chunkPos = offset % chunkSize; + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + // the first blockIndex + uint64_t blockIndexBegin = chunkPos / blockSize; + + if (len < blockSize) { // just one block + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs->push_back(std::make_pair(objectName, len)); + } else { + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + + // firstly, let's get the size in the first block + // then, subtract the length in the first block + // to obtain the remaining length + // lastly, We need to judge the last block is full or not + uint64_t firstBlockSize = (blockPos != 0) ? + blockSize - blockPos : blockSize; + uint64_t leftSize = len - firstBlockSize; + uint32_t blockCounts = (leftSize % blockSize == 0) ? + (leftSize / blockSize + 1) : (leftSize / blockSize + 1 + 1); + // so we can get the last blockIndex + // because the bolck Index is cumulative + uint64_t blockIndexEnd = blockIndexBegin + blockCounts - 1; + + // the size of the last block + uint64_t lastBlockSize = leftSize % blockSize; + // whether the first block or the last block is full or not + bool firstBlockFull = (blockPos == 0) ? true : false; + bool lastBlockFull = (lastBlockSize == 0) ? true : false; + // the start and end block Index that need travel + uint64_t travelStartIndex, travelEndIndex; + // if the block is full, the size is needed download + // of the obj is blockSize. Otherwise, the value is special. + if (!firstBlockFull) { + travelStartIndex = blockIndexBegin + 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs->push_back(std::make_pair( + objectName, firstBlockSize)); + } else { + travelStartIndex = blockIndexBegin; + } + if (!lastBlockFull) { + // block index is greater than or equal to 0 + travelEndIndex = (blockIndexEnd == blockIndexBegin) ? + blockIndexEnd : blockIndexEnd - 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexEnd, compaction, fsId, ino); + // there is no need to care about the order + // in which objects are downloaded + prefetchObjs->push_back( + std::make_pair(objectName, lastBlockSize)); + } else { + travelEndIndex = blockIndexEnd; + } + VLOG(9) << "travel obj, ino: " << ino + << ", chunkid: " << chunkid + << ", blockCounts: " << blockCounts + << ", compaction: " << compaction + << ", blockSize: " << blockSize + << ", chunkSize: " << chunkSize + << ", offset: " << offset + << ", blockIndexBegin: " << blockIndexBegin + << ", blockIndexEnd: " << blockIndexEnd + << ", len: " << len + << ", firstBlockSize: " << firstBlockSize + << ", lastBlockSize: " << lastBlockSize + << ", blockPos: " << blockPos + << ", chunkPos: " << chunkPos; + for (auto blockIndex = travelStartIndex; + blockIndex <= travelEndIndex ; blockIndex++) { + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndex, compaction, fsId, ino); + prefetchObjs->push_back(std::make_pair(objectName, blockSize)); + } + } + } +} + +// TODO(hzwuhongsong): These logics are very similar to other place, +// try to merge it +void FuseS3Client::WarmUpAllObjs( + const std::list> &prefetchObjs) { + std::atomic pendingReq(0); + curve::common::CountDownEvent cond(1); + // callback function + GetObjectAsyncCallBack cb = + [&](const S3Adapter *adapter, + const std::shared_ptr &context) { + if (context->retCode == 0) { + VLOG(9) << "Get Object success: " << context->key; + int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect( + context->key, context->buf, context->len); + if (ret < 0) { + LOG_EVERY_SECOND(INFO) << + "write read directly failed, key: " << context->key; + } + if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { + VLOG(6) << "pendingReq is over"; + cond.Signal(); + } + delete []context->buf; + return; + } + // todo: retry + LOG(WARNING) << "Get Object failed, key: " << context->key + << ", offset: " << context->offset; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + }; + + pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst); + if (pendingReq.load(std::memory_order_seq_cst)) { + VLOG(9) << "wait for pendingReq"; + for (auto iter : prefetchObjs) { + VLOG(9) << "download start: " << iter.first; + std::string name = iter.first; + uint64_t readLen = iter.second; + if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { + pendingReq.fetch_sub(1); + continue; + } + char *cacheS3 = new char[readLen]; + memset(cacheS3, 0, readLen); + auto context = std::make_shared(); + context->key = name; + context->buf = cacheS3; + context->offset = 0; + context->len = readLen; + context->cb = cb; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + } + if (pendingReq.load()) + cond.Wait(); + } +} + +void FuseS3Client::travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap) { + VLOG(9) << "travel chunk start: " << ino + << ", size: " << s3ChunkInfoMap->size(); + std::list> prefetchObjs; + for (auto &iter : *s3ChunkInfoMap) { + VLOG(9) << "travel chunk: " << iter.first; + travelChunk(ino, iter.second, &prefetchObjs); + } + WarmUpAllObjs(prefetchObjs); + VLOG(9) << "travel chunks end"; + return; +} + void FuseS3Client::UnInit() { + bgFetchStop_.store(true, std::memory_order_release); + bgFetchThread_.join(); s3Adaptor_->Stop(); FuseClient::UnInit(); + curve::common::S3Adapter::Shutdown(); } CURVEFS_ERROR FuseS3Client::FuseOpInit(void *userdata, @@ -55,35 +353,18 @@ CURVEFS_ERROR FuseS3Client::FuseOpInit(void *userdata, return ret; } -CURVEFS_ERROR FuseS3Client::CreateFs(void *userdata, FsInfo *fsInfo) { - struct MountOption *mOpts = (struct MountOption *)userdata; - std::string fsName = (mOpts->fsName == nullptr) ? "" : mOpts->fsName; - ::curvefs::common::S3Info s3Info; - s3Info.set_ak(option_.s3Opt.s3AdaptrOpt.ak); - s3Info.set_sk(option_.s3Opt.s3AdaptrOpt.sk); - s3Info.set_endpoint(option_.s3Opt.s3AdaptrOpt.s3Address); - s3Info.set_bucketname(option_.s3Opt.s3AdaptrOpt.bucketName); - s3Info.set_blocksize(option_.s3Opt.s3ClientAdaptorOpt.blockSize); - s3Info.set_chunksize(option_.s3Opt.s3ClientAdaptorOpt.chunkSize); - // fsBlockSize means min allocsize, for s3, we do not need this. - FSStatusCode ret = mdsClient_->CreateFsS3(fsName, 1, s3Info); - if (ret != FSStatusCode::OK) { - return CURVEFS_ERROR::INTERNAL; - } - - return CURVEFS_ERROR::OK; -} - CURVEFS_ERROR FuseS3Client::FuseOpWrite(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi, size_t *wSize) { // check align if (fi->flags & O_DIRECT) { - if (!(is_aligned(off, DirectIOAlignemnt) && - is_aligned(size, DirectIOAlignemnt))) + if (!(is_aligned(off, DirectIOAlignment) && + is_aligned(size, DirectIOAlignment))) return CURVEFS_ERROR::INVALIDPARAM; } + + fuseS3Throttle_.Add(false, length); uint64_t start = butil::cpuwide_time_us(); int wRet = s3Adaptor_->Write(ino, off, size, buf); if (wRet < 0) { @@ -96,6 +377,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpWrite(fuse_req_t req, fuse_ino_t ino, fsMetric_->userWrite.qps.count << 1; uint64_t duration = butil::cpuwide_time_us() - start; fsMetric_->userWrite.latency << duration; + fsMetric_->userWriteIoSize.set_value(wRet); } std::shared_ptr inodeWrapper; @@ -107,25 +389,37 @@ CURVEFS_ERROR FuseS3Client::FuseOpWrite(fuse_req_t req, fuse_ino_t ino, } ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - Inode *inode = inodeWrapper->GetMutableInodeUnlocked(); *wSize = wRet; + size_t changeSize = 0; // update file len - if (inode->length() < off + *wSize) { - inode->set_length(off + *wSize); + if (inodeWrapper->GetLengthLocked() < off + *wSize) { + changeSize = off + *wSize - inodeWrapper->GetLengthLocked(); + inodeWrapper->SetLengthLocked(off + *wSize); } - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - inode->set_mtime(now.tv_sec); - inode->set_mtime_ns(now.tv_nsec); - inode->set_ctime(now.tv_sec); - inode->set_ctime_ns(now.tv_nsec); + + inodeWrapper->UpdateTimestampLocked(kModifyTime | kChangeTime); inodeManager_->ShipToFlush(inodeWrapper); if (fi->flags & O_DIRECT || fi->flags & O_SYNC || fi->flags & O_DSYNC) { // Todo: do some cache flush later } + + if (enableSumInDir_ && changeSize != 0) { + const Inode* inode = inodeWrapper->GetInodeLocked(); + XAttr xattr; + xattr.mutable_xattrinfos()->insert({XATTRFBYTES, + std::to_string(changeSize)}); + for (const auto &it : inode->parent()) { + auto tret = xattrManager_->UpdateParentInodeXattr(it, xattr, true); + if (tret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "UpdateParentInodeXattr failed," + << " inodeId = " << it + << ", xattr = " << xattr.DebugString(); + } + } + } return ret; } @@ -135,11 +429,12 @@ CURVEFS_ERROR FuseS3Client::FuseOpRead(fuse_req_t req, fuse_ino_t ino, size_t *rSize) { // check align if (fi->flags & O_DIRECT) { - if (!(is_aligned(off, DirectIOAlignemnt) && - is_aligned(size, DirectIOAlignemnt))) + if (!(is_aligned(off, DirectIOAlignment) && + is_aligned(size, DirectIOAlignment))) return CURVEFS_ERROR::INVALIDPARAM; } + uint64_t start = butil::cpuwide_time_us(); std::shared_ptr inodeWrapper; CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); if (ret != CURVEFS_ERROR::OK) { @@ -159,6 +454,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpRead(fuse_req_t req, fuse_ino_t ino, len = size; } + fuseS3Throttle_.Add(true, length); // Read do not change inode. so we do not get lock here. int rRet = s3Adaptor_->Read(ino, off, len, buffer); if (rRet < 0) { @@ -169,21 +465,17 @@ CURVEFS_ERROR FuseS3Client::FuseOpRead(fuse_req_t req, fuse_ino_t ino, if (fsMetric_.get() != nullptr) { fsMetric_->userRead.bps.count << rRet; + fsMetric_->userRead.qps.count << 1; + uint64_t duration = butil::cpuwide_time_us() - start; + fsMetric_->userRead.latency << duration; + fsMetric_->userReadIoSize.set_value(rRet); } ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - Inode *newInode = inodeWrapper->GetMutableInodeUnlocked(); - - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - newInode->set_ctime(now.tv_sec); - newInode->set_ctime_ns(now.tv_nsec); - newInode->set_atime(now.tv_sec); - newInode->set_atime_ns(now.tv_nsec); - + inodeWrapper->UpdateTimestampLocked(kAccessTime); inodeManager_->ShipToFlush(inodeWrapper); - VLOG(6) << "read end, read size = " << *rSize; + VLOG(9) << "read end, read size = " << *rSize; return ret; } @@ -191,9 +483,8 @@ CURVEFS_ERROR FuseS3Client::FuseOpCreate(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, struct fuse_file_info *fi, fuse_entry_param *e) { - LOG(INFO) << "FuseOpCreate, parent: " << parent - << ", name: " << name - << ", mode: " << mode; + VLOG(1) << "FuseOpCreate, parent: " << parent << ", name: " << name + << ", mode: " << mode; CURVEFS_ERROR ret = MakeNode(req, parent, name, mode, FsFileType::TYPE_S3, 0, e); if (ret != CURVEFS_ERROR::OK) { @@ -205,17 +496,30 @@ CURVEFS_ERROR FuseS3Client::FuseOpCreate(fuse_req_t req, fuse_ino_t parent, CURVEFS_ERROR FuseS3Client::FuseOpMkNod(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, dev_t rdev, fuse_entry_param *e) { - LOG(INFO) << "FuseOpMkNod, parent: " << parent - << ", name: " << name - << ", mode: " << mode - << ", rdev: " << rdev; + VLOG(1) << "FuseOpMkNod, parent: " << parent << ", name: " << name + << ", mode: " << mode << ", rdev: " << rdev; return MakeNode(req, parent, name, mode, FsFileType::TYPE_S3, rdev, e); } +CURVEFS_ERROR FuseS3Client::FuseOpLink(fuse_req_t req, fuse_ino_t ino, + fuse_ino_t newparent, const char *newname, + fuse_entry_param *e) { + VLOG(1) << "FuseOpLink, ino: " << ino << ", newparent: " << newparent + << ", newname: " << newname; + return FuseClient::FuseOpLink( + req, ino, newparent, newname, FsFileType::TYPE_S3, e); +} + +CURVEFS_ERROR FuseS3Client::FuseOpUnlink(fuse_req_t req, fuse_ino_t parent, + const char *name) { + VLOG(1) << "FuseOpUnlink, parent: " << parent << ", name: " << name; + return RemoveNode(req, parent, name, FsFileType::TYPE_S3); +} + CURVEFS_ERROR FuseS3Client::FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync, struct fuse_file_info *fi) { - LOG(INFO) << "FuseOpFsync, ino: " << ino << ", datasync: " << datasync; + VLOG(1) << "FuseOpFsync, ino: " << ino << ", datasync: " << datasync; CURVEFS_ERROR ret = s3Adaptor_->Flush(ino); if (ret != CURVEFS_ERROR::OK) { @@ -237,10 +541,54 @@ CURVEFS_ERROR FuseS3Client::FuseOpFsync(fuse_req_t req, fuse_ino_t ino, return inodeWrapper->Sync(); } -CURVEFS_ERROR FuseS3Client::Truncate(Inode *inode, uint64_t length) { +CURVEFS_ERROR FuseS3Client::Truncate(InodeWrapper *inode, uint64_t length) { return s3Adaptor_->Truncate(inode, length); } +CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino, + struct fuse_file_info *fi) { + VLOG(1) << "FuseOpFlush, ino: " << ino; + CURVEFS_ERROR ret = CURVEFS_ERROR::OK; + + // if enableCto, flush all write cache both in memory cache and disk cache + if (curvefs::client::common::FLAGS_enableCto) { + ret = s3Adaptor_->FlushAllCache(ino); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "FuseOpFlush, flush all cache fail, ret = " << ret + << ", ino: " << ino; + return ret; + } + VLOG(3) << "FuseOpFlush, flush to s3 ok"; + + std::shared_ptr inodeWrapper; + ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "FuseOpFlush, inodeManager get inode fail, ret = " + << ret << ", ino: " << ino; + return ret; + } + + ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + ret = inodeWrapper->Sync(); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "FuseOpFlush, inode sync s3 chunk info fail, ret = " + << ret << ", ino: " << ino; + return ret; + } + // if disableCto, flush just flush data in memory + } else { + ret = s3Adaptor_->Flush(ino); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "FuseOpFlush, flush to diskcache failed, ret = " + << ret << ", ino: " << ino; + return ret; + } + } + + VLOG(1) << "FuseOpFlush, ino: " << ino << " flush ok"; + return CURVEFS_ERROR::OK; +} + void FuseS3Client::FlushData() { CURVEFS_ERROR ret = CURVEFS_ERROR::UNKNOWN; do { diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index 8e8fcb548b..2b0ee47fe2 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -25,18 +25,30 @@ #define CURVEFS_SRC_CLIENT_FUSE_S3_CLIENT_H_ #include +#include +#include +#include +#include #include "curvefs/src/client/fuse_client.h" +#include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "src/common/s3_adapter.h" +#include "src/common/throttle.h" namespace curvefs { namespace client { +using curve::common::GetObjectAsyncContext; +using curve::common::GetObjectAsyncCallBack; +using curve::common::ReadWriteThrottleParams; +using curve::common::Throttle; +using curve::common::ThrottleParams; + class FuseS3Client : public FuseClient { public: FuseS3Client() : FuseClient(), - s3Adaptor_(std::make_shared()), - s3Client_(nullptr) {} + s3Adaptor_(std::make_shared()) {} FuseS3Client(const std::shared_ptr &mdsClient, const std::shared_ptr &metaClient, @@ -45,49 +57,76 @@ class FuseS3Client : public FuseClient { const std::shared_ptr &s3Adaptor) : FuseClient(mdsClient, metaClient, inodeManager, dentryManager), - s3Adaptor_(s3Adaptor), - s3Client_(nullptr) {} + s3Adaptor_(s3Adaptor) {} CURVEFS_ERROR Init(const FuseClientOption &option) override; void UnInit() override; + CURVEFS_ERROR FuseOpInit( void *userdata, struct fuse_conn_info *conn) override; + CURVEFS_ERROR FuseOpWrite(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi, size_t *wSize) override; CURVEFS_ERROR FuseOpRead(fuse_req_t req, - fuse_ino_t ino, size_t size, off_t off, - struct fuse_file_info *fi, - char *buffer, - size_t *rSize) override; + fuse_ino_t ino, size_t size, off_t off, + struct fuse_file_info *fi, + char *buffer, + size_t *rSize) override; CURVEFS_ERROR FuseOpCreate(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, struct fuse_file_info *fi, fuse_entry_param *e) override; CURVEFS_ERROR FuseOpMkNod(fuse_req_t req, fuse_ino_t parent, - const char *name, mode_t mode, dev_t rdev, - fuse_entry_param *e) override; + const char *name, mode_t mode, dev_t rdev, + fuse_entry_param *e) override; + + CURVEFS_ERROR FuseOpLink(fuse_req_t req, fuse_ino_t ino, + fuse_ino_t newparent, const char *newname, + fuse_entry_param *e) override; + + CURVEFS_ERROR FuseOpUnlink(fuse_req_t req, fuse_ino_t parent, + const char *name) override; CURVEFS_ERROR FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync, - struct fuse_file_info *fi) override; + struct fuse_file_info *fi) override; + + CURVEFS_ERROR FuseOpFlush(fuse_req_t req, fuse_ino_t ino, + struct fuse_file_info *fi) override; private: - CURVEFS_ERROR Truncate(Inode *inode, uint64_t length) override; + CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override; void FlushData() override; - - CURVEFS_ERROR CreateFs( - void *userdata, FsInfo *fsInfo) override; - + // get the warmUp filelist + void GetWarmUpFileList(const WarmUpFileContext_t&, + std::vector&); + void BackGroundFetch(); + // put the file needed warmup to queue, + // then can downlaod the objs belong to it + void fetchDataEnqueue(fuse_ino_t ino); + // travel all chunks + void travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap); + // travel and download all objs belong to the chunk + void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, + std::list>* prefetchObjs); + // warmup all the prefetchObjs + void WarmUpAllObjs(const std::list< + std::pair> &prefetchObjs); + + void InitQosParam(); private: // s3 adaptor std::shared_ptr s3Adaptor_; - // s3 client - std::shared_ptr s3Client_; + Thread bgFetchThread_; + std::atomic bgFetchStop_; + std::mutex fetchMtx_; + Throttle fuseS3Throttle_; }; diff --git a/curvefs/src/client/fuse_volume_client.cpp b/curvefs/src/client/fuse_volume_client.cpp index 22f6025a37..d4f2acbddd 100644 --- a/curvefs/src/client/fuse_volume_client.cpp +++ b/curvefs/src/client/fuse_volume_client.cpp @@ -21,277 +21,234 @@ * Author: xuchaojie */ -#include -#include +#include "curvefs/src/client/fuse_volume_client.h" + +#include +#include #include +#include -#include "curvefs/src/client/fuse_volume_client.h" +#include "absl/cleanup/cleanup.h" +#include "absl/memory/memory.h" +#include "curvefs/proto/mds.pb.h" +#include "curvefs/src/client/error_code.h" +#include "curvefs/src/client/volume/default_volume_storage.h" +#include "curvefs/src/client/volume/extent_cache.h" +#include "curvefs/src/volume/common.h" +#include "curvefs/src/volume/option.h" namespace curvefs { namespace client { +using ::curvefs::volume::SpaceManagerImpl; +using ::curvefs::volume::SpaceManagerOption; +using ::curvefs::volume::BlockDeviceClientOptions; +using ::curvefs::volume::BlockDeviceClientImpl; + CURVEFS_ERROR FuseVolumeClient::Init(const FuseClientOption &option) { volOpts_ = option.volumeOpt; CURVEFS_ERROR ret = FuseClient::Init(option); - if (ret != CURVEFS_ERROR::OK) { - return ret; - } - spaceBase_ = std::make_shared(); - ret = spaceClient_->Init(option.spaceOpt, spaceBase_.get()); + if (ret != CURVEFS_ERROR::OK) { return ret; } - ret = extManager_->Init(option.extentManagerOpt); - if (ret != CURVEFS_ERROR::OK) { - return ret; + BlockDeviceClientOptions opts; + opts.configPath = option.bdevOpt.configPath; + + bool ret2 = blockDeviceClient_->Init(opts); + + if (!ret2) { + LOG(ERROR) << "Init block device client failed"; + return CURVEFS_ERROR::INTERNAL; } - ret = blockDeviceClient_->Init(option.bdevOpt); + + InitQosParam(); return ret; } void FuseVolumeClient::UnInit() { + storage_->Shutdown(); + spaceManager_->Shutdown(); blockDeviceClient_->UnInit(); + FuseClient::UnInit(); } -CURVEFS_ERROR FuseVolumeClient::CreateFs(void *userdata, FsInfo *fsInfo) { - struct MountOption *mOpts = (struct MountOption *)userdata; - std::string volName = (mOpts->volume == nullptr) ? "" : mOpts->volume; - std::string fsName = (mOpts->fsName == nullptr) ? "" : mOpts->fsName; - std::string user = (mOpts->user == nullptr) ? "" : mOpts->user; +void FuseVolumeClient::InitQosParam() { + ReadWriteThrottleParams params; + params.iopsWrite = ThrottleParams(FLAGS_avgFlushIops, 0, 0); + params.bpsWrite = ThrottleParams(FLAGS_avgFlushBytes, FLAGS_burstFlushBytes, + FLAGS_burstSecs); + params.iopsRead = ThrottleParams(FLAGS_avgReadFileIops, 0, 0); + params.bpsRead = ThrottleParams(FLAGS_avgReadFileBytes, 0, 0); - CURVEFS_ERROR ret = CURVEFS_ERROR::OK; - BlockDeviceStat stat; - ret = blockDeviceClient_->Stat(volName, user, &stat); + fuseVolumeThrottle_.UpdateThrottleParams(params); +} + +CURVEFS_ERROR FuseVolumeClient::FuseOpInit(void *userdata, + struct fuse_conn_info *conn) { + auto ret = FuseClient::FuseOpInit(userdata, conn); if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "Stat volume failed, ret = " << ret - << ", volName = " << volName << ", user = " << user; + LOG(ERROR) << "fuse op init failed, error: " << ret; return ret; } - Volume vol; - vol.set_volumesize(stat.length); - vol.set_blocksize(volOpts_.volBlockSize); - vol.set_volumename(volName); - vol.set_user(user); - - FSStatusCode ret2 = mdsClient_->CreateFs(fsName, volOpts_.fsBlockSize, vol); - if (ret2 != FSStatusCode::OK) { + const auto &vol = fsInfo_->detail().volume(); + const auto &volName = vol.volumename(); + const auto &user = vol.user(); + auto ret2 = blockDeviceClient_->Open(volName, user); + if (!ret2) { + LOG(ERROR) << "BlockDeviceClientImpl open failed, ret = " << ret + << ", volName = " << volName << ", user = " << user; return CURVEFS_ERROR::INTERNAL; } - return CURVEFS_ERROR::OK; -} -CURVEFS_ERROR FuseVolumeClient::FuseOpInit( - void *userdata, struct fuse_conn_info *conn) { - struct MountOption *mOpts = (struct MountOption *) userdata; - std::string volName = (mOpts->volume == nullptr) ? "" : mOpts->volume; - std::string user = (mOpts->user == nullptr) ? "" : mOpts->user; - CURVEFS_ERROR ret = blockDeviceClient_->Open(volName, user); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "BlockDeviceClientImpl open failed, ret = " << ret - << ", volName = " << volName - << ", user = " << user; - return ret; - } - return FuseClient::FuseOpInit(userdata, conn); -} + SpaceManagerOption option; + option.blockGroupManagerOption.fsId = fsInfo_->fsid(); + option.blockGroupManagerOption.owner = mountpoint_.hostname() + ":" + + std::to_string(mountpoint_.port()) + + ":" + mountpoint_.path(); + option.blockGroupManagerOption.blockGroupAllocateOnce = + volOpts_.allocatorOption.blockGroupOption.allocateOnce; + option.blockGroupManagerOption.blockGroupSize = + fsInfo_->detail().volume().blockgroupsize(); + option.blockGroupManagerOption.blockSize = + fsInfo_->detail().volume().blocksize(); -void FuseVolumeClient::FuseOpDestroy(void *userdata) { - FuseClient::FuseOpDestroy(userdata); - CURVEFS_ERROR ret = blockDeviceClient_->Close(); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "BlockDeviceClientImpl close failed, ret = " << ret; - return; - } - return; + option.allocatorOption.type = volOpts_.allocatorOption.type; + option.allocatorOption.bitmapAllocatorOption.sizePerBit = + volOpts_.allocatorOption.bitmapAllocatorOption.sizePerBit; + option.allocatorOption.bitmapAllocatorOption.smallAllocProportion = + volOpts_.allocatorOption.bitmapAllocatorOption.smallAllocProportion; + + spaceManager_ = absl::make_unique(option, mdsClient_, + blockDeviceClient_); + + storage_ = absl::make_unique( + spaceManager_.get(), blockDeviceClient_.get(), inodeManager_.get()); + + ExtentCacheOption extentOpt; + extentOpt.blockSize = vol.blocksize(); + extentOpt.sliceSize = vol.slicesize(); + + ExtentCache::SetOption(extentOpt); + + return CURVEFS_ERROR::OK; } -CURVEFS_ERROR FuseVolumeClient::FuseOpWrite(fuse_req_t req, fuse_ino_t ino, - const char *buf, size_t size, +CURVEFS_ERROR FuseVolumeClient::FuseOpWrite(fuse_req_t req, + fuse_ino_t ino, + const char *buf, + size_t size, off_t off, struct fuse_file_info *fi, size_t *wSize) { - // check align + VLOG(9) << "write start, ino: " << ino << ", offset: " << off + << ", length: " << size; + if (fi->flags & O_DIRECT) { - if (!(is_aligned(off, DirectIOAlignemnt) && - is_aligned(size, DirectIOAlignemnt))) + if (!(is_aligned(off, DirectIOAlignment) && + is_aligned(size, DirectIOAlignment))) { + fsMetric_->userWrite.eps.count << 1; return CURVEFS_ERROR::INVALIDPARAM; + } } - std::shared_ptr inodeWrapper; - CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "inodeManager get inode fail, ret = " << ret - << ", inodeid = " << ino; - return ret; - } - - ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - Inode inode = inodeWrapper->GetInodeUnlocked(); + fuseVolumeThrottle_.Add(false, length); + butil::Timer timer; + timer.start(); - std::list toAllocExtents; - // get the extent need to be allocate - ret = extManager_->GetToAllocExtents(inode.volumeextentlist(), off, size, - &toAllocExtents); + CURVEFS_ERROR ret = storage_->Write(ino, off, size, buf); if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "GetToAllocExtents fail, ret = " << ret; - return ret; - } - if (toAllocExtents.size() != 0) { - AllocateType type = AllocateType::NONE; - if (inode.length() >= volOpts_.bigFileSize || - size >= volOpts_.bigFileSize) { - type = AllocateType::BIG; - } else { - type = AllocateType::SMALL; - } - std::list allocatedExtents; - // to alloc extents - ret = spaceClient_->AllocExtents(fsInfo_->fsid(), toAllocExtents, type, - &allocatedExtents); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "metaClient alloc extents fail, ret = " << ret; - return ret; + if (fsMetric_) { + fsMetric_->userWrite.eps.count << 1; } - // merge the allocated extent to inode - ret = extManager_->MergeAllocedExtents( - toAllocExtents, allocatedExtents, inode.mutable_volumeextentlist()); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "toAllocExtents and allocatedExtents not match, " - << "ret = " << ret; - CURVEFS_ERROR ret2 = - spaceClient_->DeAllocExtents(fsInfo_->fsid(), allocatedExtents); - if (ret2 != CURVEFS_ERROR::OK) { - LOG(ERROR) << "DeAllocExtents fail, ret = " << ret; - } - return ret; - } - } - - // divide the extents which is write or not - std::list pExtents; - ret = extManager_->DivideExtents(inode.volumeextentlist(), off, size, - &pExtents); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "DivideExtents fail, ret = " << ret; + LOG(ERROR) << "write error, ino: " << ino << ", offset: " << off + << ", len: " << size + << ", error: " << ret; return ret; } - // write the physical extents - uint64_t writeLen = 0; - for (const auto &ext : pExtents) { - ret = blockDeviceClient_->Write(buf + writeLen, ext.pOffset, ext.len); - writeLen += ext.len; - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "block device write fail, ret = " << ret; - return ret; - } - } - - // make the unwritten flag in the inode. - ret = extManager_->MarkExtentsWritten(off, size, - inode.mutable_volumeextentlist()); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "MarkExtentsWritten fail, ret = " << ret; - return ret; - } *wSize = size; - // update file len - if (inode.length() < off + *wSize) { - inode.set_length(off + *wSize); - } - - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - inode.set_mtime(now.tv_sec); - inode.set_mtime_ns(now.tv_nsec); - inode.set_ctime(now.tv_sec); - inode.set_ctime_ns(now.tv_nsec); - - inodeWrapper->SwapInode(&inode); - inodeManager_->ShipToFlush(inodeWrapper); + // NOTE: O_DIRECT/O_SYNC/O_DSYNC have simillar semantic, but not exactly the + // same, see `man 2 open` for more details if (fi->flags & O_DIRECT || fi->flags & O_SYNC || fi->flags & O_DSYNC) { // Todo: do some cache flush later } - return ret; + + timer.stop(); + + if (fsMetric_) { + fsMetric_->userWrite.bps.count << size; + fsMetric_->userWrite.qps.count << 1; + fsMetric_->userWrite.latency << timer.u_elapsed(); + fsMetric_->userWriteIoSize.set_value(size); + } + + VLOG(9) << "write end, ino: " << ino << ", offset: " << off + << ", length: " << size << ", written: " << *wSize; + + return CURVEFS_ERROR::OK; } -CURVEFS_ERROR FuseVolumeClient::FuseOpRead(fuse_req_t req, fuse_ino_t ino, - size_t size, off_t off, +CURVEFS_ERROR FuseVolumeClient::FuseOpRead(fuse_req_t req, + fuse_ino_t ino, + size_t size, + off_t off, struct fuse_file_info *fi, - char *buffer, size_t *rSize) { + char *buffer, + size_t *rSize) { + VLOG(3) << "read start, ino: " << ino << ", offset: " << off + << ", length: " << size; + // check align if (fi->flags & O_DIRECT) { - if (!(is_aligned(off, DirectIOAlignemnt) && - is_aligned(size, DirectIOAlignemnt))) - return CURVEFS_ERROR::INVALIDPARAM; - } + if (!(is_aligned(off, DirectIOAlignment) && + is_aligned(size, DirectIOAlignment))) { + fsMetric_->userRead.eps.count << 1; - std::shared_ptr inodeWrapper; - CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "inodeManager get inode fail, ret = " << ret - << ", inodeid = " << ino; - return ret; + return CURVEFS_ERROR::INVALIDPARAM; + } } - ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - Inode inode = inodeWrapper->GetInodeUnlocked(); + fuseVolumeThrottle_.Add(true, length); + butil::Timer timer; + timer.start(); - size_t len = 0; - if (inode.length() <= off) { - *rSize = 0; - return CURVEFS_ERROR::OK; - } else if (inode.length() < off + size) { - len = inode.length() - off; - } else { - len = size; - } - std::list pExtents; - ret = extManager_->DivideExtents(inode.volumeextentlist(), off, len, - &pExtents); + CURVEFS_ERROR ret = storage_->Read(ino, off, size, buffer); if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "DivideExtents fail, ret = " << ret; + if (fsMetric_) { + fsMetric_->userRead.eps.count << 1; + } + LOG(ERROR) << "read error, ino: " << ino << ", offset: " << off + << ", len: " << size << ", error: " << ret; return ret; } - uint64_t readOff = 0; - for (const auto &ext : pExtents) { - if (!ext.UnWritten) { - ret = blockDeviceClient_->Read(buffer + readOff, ext.pOffset, - ext.len); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "block device read fail, ret = " << ret; - return ret; - } - } - readOff += ext.len; + + if (fsMetric_) { + fsMetric_->userRead.bps.count << size; + fsMetric_->userRead.qps.count << 1; + fsMetric_->userRead.latency << timer.u_elapsed(); + fsMetric_->userReadIoSize.set_value(size); } - *rSize = len; - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - inode.set_ctime(now.tv_sec); - inode.set_ctime_ns(now.tv_nsec); - inode.set_atime(now.tv_sec); - inode.set_atime_ns(now.tv_nsec); + *rSize = size; - inodeWrapper->SwapInode(&inode); - inodeManager_->ShipToFlush(inodeWrapper); + VLOG(3) << "read end, ino: " << ino << ", offset: " << off + << ", length: " << size << ", rsize: " << *rSize; - VLOG(6) << "read end, read size = " << *rSize; - return ret; + return CURVEFS_ERROR::OK; } CURVEFS_ERROR FuseVolumeClient::FuseOpCreate(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, struct fuse_file_info *fi, fuse_entry_param *e) { - LOG(INFO) << "FuseOpCreate, parent: " << parent + VLOG(3) << "FuseOpCreate, parent: " << parent << ", name: " << name << ", mode: " << mode; CURVEFS_ERROR ret = @@ -305,28 +262,81 @@ CURVEFS_ERROR FuseVolumeClient::FuseOpCreate(fuse_req_t req, fuse_ino_t parent, CURVEFS_ERROR FuseVolumeClient::FuseOpMkNod(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, dev_t rdev, fuse_entry_param *e) { - LOG(INFO) << "FuseOpMkNod, parent: " << parent - << ", name: " << name - << ", mode: " << mode - << ", rdev: " << rdev; + VLOG(3) << "FuseOpMkNod, parent: " << parent << ", name: " << name + << ", mode: " << mode << ", rdev: " << rdev; return MakeNode(req, parent, name, mode, FsFileType::TYPE_FILE, rdev, e); } +CURVEFS_ERROR FuseVolumeClient::FuseOpLink(fuse_req_t req, fuse_ino_t ino, + fuse_ino_t newparent, const char *newname, + fuse_entry_param *e) { + VLOG(1) << "FuseOpLink, ino: " << ino << ", newparent: " << newparent + << ", newname: " << newname; + return FuseClient::FuseOpLink( + req, ino, newparent, newname, FsFileType::TYPE_FILE, e); +} + +CURVEFS_ERROR FuseVolumeClient::FuseOpUnlink(fuse_req_t req, fuse_ino_t parent, + const char *name) { + VLOG(1) << "FuseOpUnlink, parent: " << parent << ", name: " << name; + return RemoveNode(req, parent, name, FsFileType::TYPE_FILE); +} + CURVEFS_ERROR FuseVolumeClient::FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync, struct fuse_file_info *fi) { - LOG(INFO) << "FuseOpFsync, ino: " << ino << ", datasync: " << datasync; - return CURVEFS_ERROR::NOTSUPPORT; + VLOG(3) << "FuseOpFsync start, ino: " << ino << ", datasync: " << datasync; + + CURVEFS_ERROR ret = storage_->Flush(ino); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "Storage flush ino: " << ino << " failed, error: " << ret; + return ret; + } + + if (datasync) { + VLOG(3) << "FuseOpFsync end, ino: " << ino + << ", datasync: " << datasync; + return CURVEFS_ERROR::OK; + } + + std::shared_ptr inodeWrapper; + ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "Get inode fail, ino: " << ino << ", ret: " << ret; + return ret; + } + + auto lk = inodeWrapper->GetUniqueLock(); + return inodeWrapper->Sync(); } -CURVEFS_ERROR FuseVolumeClient::Truncate(Inode *inode, uint64_t length) { +CURVEFS_ERROR FuseVolumeClient::Truncate(InodeWrapper *inode, uint64_t length) { // Todo: call volume truncate return CURVEFS_ERROR::OK; } +CURVEFS_ERROR FuseVolumeClient::FuseOpFlush(fuse_req_t req, fuse_ino_t ino, + struct fuse_file_info *fi) { + VLOG(9) << "FuseOpFlush, ino: " << ino; + + CURVEFS_ERROR ret = storage_->Flush(ino); + LOG_IF(ERROR, ret != CURVEFS_ERROR::OK) + << "Flush error, ino: " << ino << ", error: " << ret; + + return ret; +} + void FuseVolumeClient::FlushData() { // TODO(xuchaojie) : flush volume data } +void FuseVolumeClient::SetSpaceManagerForTesting(SpaceManager *manager) { + spaceManager_.reset(manager); +} + +void FuseVolumeClient::SetVolumeStorageForTesting(VolumeStorage *storage) { + storage_.reset(storage); +} + } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/fuse_volume_client.h b/curvefs/src/client/fuse_volume_client.h index f6d0752f2a..27150b2df5 100644 --- a/curvefs/src/client/fuse_volume_client.h +++ b/curvefs/src/client/fuse_volume_client.h @@ -27,36 +27,37 @@ #include #include "curvefs/src/client/fuse_client.h" -#include "curvefs/src/client/space_client.h" -#include "curvefs/src/client/extent_manager.h" +#include "curvefs/src/client/volume/volume_storage.h" +#include "curvefs/src/volume/block_device_client.h" +#include "curvefs/src/volume/space_manager.h" +#include "src/common/throttle.h" namespace curvefs { namespace client { using common::VolumeOption; +using ::curvefs::volume::BlockDeviceClient; +using ::curvefs::volume::BlockDeviceClientImpl; +using ::curvefs::volume::SpaceManager; +using curve::common::ReadWriteThrottleParams; +using curve::common::Throttle; +using curve::common::ThrottleParams; class FuseVolumeClient : public FuseClient { public: FuseVolumeClient() - : FuseClient(), - spaceClient_(std::make_shared()), - extManager_(std::make_shared()), - blockDeviceClient_(std::make_shared()), - spaceBase_(nullptr) {} + : FuseClient(), + blockDeviceClient_(std::make_shared()) {} - FuseVolumeClient(const std::shared_ptr &mdsClient, + // for UNIT_TEST + FuseVolumeClient( + const std::shared_ptr &mdsClient, const std::shared_ptr &metaClient, const std::shared_ptr &inodeManager, const std::shared_ptr &dentryManager, - const std::shared_ptr &spaceClient, - const std::shared_ptr &extManager, const std::shared_ptr &blockDeviceClient) - : FuseClient(mdsClient, metaClient, - inodeManager, dentryManager), - spaceClient_(spaceClient), - extManager_(extManager), - blockDeviceClient_(blockDeviceClient), - spaceBase_(nullptr) {} + : FuseClient(mdsClient, metaClient, inodeManager, dentryManager), + blockDeviceClient_(blockDeviceClient) {} CURVEFS_ERROR Init(const FuseClientOption &option) override; @@ -65,8 +66,6 @@ class FuseVolumeClient : public FuseClient { CURVEFS_ERROR FuseOpInit( void *userdata, struct fuse_conn_info *conn) override; - void FuseOpDestroy(void *userdata) override; - CURVEFS_ERROR FuseOpWrite(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi, size_t *wSize) override; @@ -76,7 +75,6 @@ class FuseVolumeClient : public FuseClient { struct fuse_file_info *fi, char *buffer, size_t *rSize) override; - CURVEFS_ERROR FuseOpCreate(fuse_req_t req, fuse_ino_t parent, const char *name, mode_t mode, struct fuse_file_info *fi, fuse_entry_param *e) override; @@ -85,30 +83,36 @@ class FuseVolumeClient : public FuseClient { const char *name, mode_t mode, dev_t rdev, fuse_entry_param *e) override; + CURVEFS_ERROR FuseOpLink(fuse_req_t req, fuse_ino_t ino, + fuse_ino_t newparent, const char *newname, + fuse_entry_param *e) override; + + CURVEFS_ERROR FuseOpUnlink(fuse_req_t req, fuse_ino_t parent, + const char *name) override; + CURVEFS_ERROR FuseOpFsync(fuse_req_t req, fuse_ino_t ino, int datasync, - struct fuse_file_info *fi) override; + struct fuse_file_info *fi) override; - private: - CURVEFS_ERROR Truncate(Inode *inode, uint64_t length) override; + CURVEFS_ERROR FuseOpFlush(fuse_req_t req, fuse_ino_t ino, + struct fuse_file_info *fi) override; - void FlushData() override; + void SetSpaceManagerForTesting(SpaceManager *manager); - CURVEFS_ERROR CreateFs( - void *userdata, FsInfo *fsInfo) override; + void SetVolumeStorageForTesting(VolumeStorage *storage); private: - // space client - std::shared_ptr spaceClient_; + CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override; - // extent manager - std::shared_ptr extManager_; + void FlushData() override; + void InitQosParam(); - // curve client + private: std::shared_ptr blockDeviceClient_; - - std::shared_ptr spaceBase_; + std::unique_ptr spaceManager_; + std::unique_ptr storage_; VolumeOption volOpts_; + Throttle fuseVolumeThrottle_; }; } // namespace client