From c570cbd8a5a2cc3b48ae4cfb832d27e04f8f22c1 Mon Sep 17 00:00:00 2001 From: hzwuhongsong Date: Mon, 8 Aug 2022 16:33:06 +0800 Subject: [PATCH] curvefs/client: add feature of warmup --- curvefs/src/client/fuse_client.cpp | 225 ++++++++++++++++++++- curvefs/src/client/fuse_client.h | 95 ++++++++- curvefs/src/client/fuse_s3_client.cpp | 233 +++++++++++++++++++++- curvefs/src/client/fuse_s3_client.h | 20 ++ curvefs/src/client/inode_wrapper.h | 4 + curvefs/src/client/s3/client_s3_adaptor.h | 13 ++ 6 files changed, 582 insertions(+), 8 deletions(-) diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp index 0ae4a26c4b..50ae231db1 100644 --- a/curvefs/src/client/fuse_client.cpp +++ b/curvefs/src/client/fuse_client.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include "curvefs/proto/mds.pb.h" #include "curvefs/src/client/fuse_common.h" @@ -75,6 +76,15 @@ using rpcclient::Cli2ClientImpl; using rpcclient::MetaCache; using common::FLAGS_enableCto; +static bool checkWarmupListPath(const char*, const std::string& target) { + // do something to check the path + LOG(INFO) << "warmupListPath: " << target; + return true; +} +DEFINE_string(warmupListPath, "", + "the path to the list of files (dirs) that need to warmup."); +DEFINE_validator(warmupListPath, checkWarmupListPath); + CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) { option_ = option; @@ -122,17 +132,224 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) { if (ret3 != CURVEFS_ERROR::OK) { return ret3; } - ret3 = dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics); if (ret3 != CURVEFS_ERROR::OK) { return ret3; } - + warmUpFile_.exist = false; + bgCmdTaskStop_.store(false, std::memory_order_release); + bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this); + bgCmdStop_.store(false, std::memory_order_release); + bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this); + FLAGS_warmupListPath = ""; + taskFetchMetaPool_.Start(WARMUP_THREADS); return ret3; } +void FuseClient::BackGroundCmd() { + std::string preWarmUpPath = FLAGS_warmupListPath; + std::string warmUpPath; + while (!bgCmdStop_.load(std::memory_order_acquire)) { + warmUpPath = FLAGS_warmupListPath; + if (warmUpPath == preWarmUpPath) { + usleep(WARMUP_CHECKINTERVAL_US); // check interval + continue; + } + VLOG(6) << "has new warmUp task: " << warmUpPath; + preWarmUpPath = warmUpPath; + PutWarmTask(warmUpPath); + WarmUpRun(); + } + return; +} + +void FuseClient::BackGroundCmdTask() { + while (!bgCmdStop_.load(std::memory_order_acquire)) { + std::list readAheadPaths; + WaitWarmUp(); + while (hasWarmTask()) { + std::string warmUpTask; + GetarmTask(warmUpTask); + if (warmUpTask.empty()) { + continue; + } + VLOG(6) << "warmUp task is: " << warmUpTask; + std::vector splitPath; + boost::split(splitPath, warmUpTask, boost::is_any_of("/"), boost::token_compress_on); + Dentry dentry; + CURVEFS_ERROR ret = dentryManager_->GetDentry( + fsInfo_->rootinodeid(), splitPath[1], &dentry); + if (ret != CURVEFS_ERROR::OK) { + if (ret != CURVEFS_ERROR::NOTEXIST) { + LOG(WARNING) << "dentryManager_ get dentry fail: " + << ret << ", name: " << warmUpTask; + } + VLOG(1) << "FetchDentry error: " << ret; + return; + } + if (FsFileType::TYPE_S3 != dentry.type()) { + VLOG(3) << "not a file: " << warmUpTask; + return; + } + + fuse_ino_t ino = dentry.inodeid(); + std::shared_ptr inodeWrapper; + ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "inodeManager get inode fail, ret = " + << ret << ", inodeid = " << ino; + return; + } + uint64_t len = inodeWrapper->GetLength(); + VLOG(9) << "ino is: " << ino << ", len is: " << len; + WarmUpFileContext_t warmUpFile{ino, len, true}; + SetWarmUpFile(warmUpFile); + } + } +} + +void FuseClient::FetchDentryEnqueue(std::string file) { + VLOG(6) << "FetchDentryEnqueue start: " << file; + auto task = [this, file]() { + LookPath(file); + }; + taskFetchMetaPool_.Enqueue(task); +} + +void FuseClient::LookPath(std::string file) { + VLOG(6) << "LookPath start: " << file; + // remove the blank + boost::trim(file); + std::vector splitPath; + boost::split(splitPath, file, boost::is_any_of("/"), boost::token_compress_on); + if (splitPath.size() == 2 + && splitPath.back().empty()) { + VLOG(6) << "i am root"; + FetchChildDentryEnqueue(fsInfo_->rootinodeid()); + return; + } else if (splitPath.size() == 2) { + VLOG(6) << "parent is root: " << fsInfo_->rootinodeid() + << ", path is: " << splitPath[1]; + this->FetchDentry(fsInfo_->rootinodeid(), splitPath[1]); + return; + } else if (splitPath.size() > 2) { // travel path + VLOG(6) << "traverse path size: " << splitPath.size() ; + std::string lastName = splitPath.back(); + splitPath.pop_back(); + fuse_ino_t ino = fsInfo_->rootinodeid(); + auto iter = splitPath.begin(); + // the first member is always empty, so skip + iter++; + for (; iter != splitPath.end(); iter++) { + VLOG(9) << "traverse path: " << *iter + << "ino is: " << ino; + Dentry dentry; + std::string pathName = *iter; + CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, pathName, &dentry); + if (ret != CURVEFS_ERROR::OK) { + if (ret != CURVEFS_ERROR::NOTEXIST) { + LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret + << ", parent inodeid = " << ino + << ", name = " << file; + } + VLOG(1) << "FetchDentry error: " << ret; + return; + } + ino = dentry.inodeid(); + } + this->FetchDentry(ino, lastName); + VLOG(9) << "ino is: " << ino + << "lastname is: " << lastName; + return; + } else { + VLOG(0) << "unknown path"; + } + return; +} + +void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) { + auto task = [this, ino]() { + // resolve层层递进,获得inode + this->FetchChildDentry(ino); + }; + taskFetchMetaPool_.Enqueue(task); +} + +void FuseClient::FetchChildDentry(fuse_ino_t ino) { + VLOG(9) << "FetchChildDentry start: " << ino; + std::list dentryList; + auto limit = option_.listDentryLimit; + CURVEFS_ERROR ret = dentryManager_->ListDentry( + ino, &dentryList, limit); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret + << ", parent = " << ino; + return; + } + for (auto iter : dentryList) { + VLOG(9) << "FetchChildDentry: " << iter.name(); + if (FsFileType::TYPE_S3 == iter.type()) { + std::unique_lock lck(fetchMtx_); + readAheadFiles_.push_front(iter.inodeid()); + VLOG(9) << "FetchChildDentry: " << iter.inodeid();; + } else if (FsFileType::TYPE_DIRECTORY == iter.type()) { + FetchChildDentryEnqueue(iter.inodeid()); + VLOG(9) << "FetchChildDentry: " << iter.inodeid(); + } else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo + + } else { + VLOG(0) << "unknown type"; + } + } + return; +} + +void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) { + VLOG(9) << "FetchDentry start: " << file + << ", ino: " << ino; + Dentry dentry; + CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry); + if (ret != CURVEFS_ERROR::OK) { + if (ret != CURVEFS_ERROR::NOTEXIST) { + LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret + << ", parent inodeid = " << ino + << ", name = " << file; + } + VLOG(1) << "FetchDentry error: " << ret; + return; + } + if (FsFileType::TYPE_S3 == dentry.type()) { + std::unique_lock lck(fetchMtx_); + readAheadFiles_.push_front(dentry.inodeid()); + return; + } else if (FsFileType::TYPE_DIRECTORY == dentry.type()) { + FetchChildDentryEnqueue(dentry.inodeid()); + VLOG(9) << "FetchDentry: " << dentry.inodeid(); + return; + + } else if (FsFileType::TYPE_SYM_LINK == dentry.type()){ + + } else { + VLOG(0) << "unkown, file: " << file + << ", ino: " << ino; + } + VLOG(9) << "FetchDentry end: " << file + << ", ino: " << ino; + return; +} + void FuseClient::UnInit() { + bgCmdTaskStop_.store(true, std::memory_order_release); + bgCmdStop_.store(true, std::memory_order_release); + WarmUpRun(); + if (bgCmdTaskThread_.joinable()) { + bgCmdTaskThread_.join(); + } + if (bgCmdThread_.joinable()) { + bgCmdThread_.join(); + } + taskFetchMetaPool_.Stop(); delete mdsBase_; mdsBase_ = nullptr; } @@ -178,7 +395,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata, << ", mountPoint = " << mountpoint_.ShortDebugString(); return CURVEFS_ERROR::MOUNT_FAILED; } - inodeManager_->SetFsId(fsInfo_->fsid()); dentryManager_->SetFsId(fsInfo_->fsid()); enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto; @@ -196,7 +412,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata, } init_ = true; - + mounted_.store(true, std::memory_order_release); return CURVEFS_ERROR::OK; } @@ -318,7 +534,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino, << ", inodeid = " << ino; return ret; } - ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); if (fi->flags & O_TRUNC) { if (fi->flags & O_WRONLY || fi->flags & O_RDWR) { diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h index 1312a5790f..4a6cb9d19a 100644 --- a/curvefs/src/client/fuse_client.h +++ b/curvefs/src/client/fuse_client.h @@ -45,12 +45,15 @@ #include "curvefs/src/client/metric/client_metric.h" #include "src/common/concurrent/concurrent.h" #include "curvefs/src/common/define.h" +#include "curvefs/src/common/s3util.h" #include "curvefs/src/client/common/common.h" #include "curvefs/src/client/client_operator.h" #include "curvefs/src/client/lease/lease_excutor.h" #include "curvefs/src/client/xattr_manager.h" #define DirectIOAlignment 512 +#define WARMUP_CHECKINTERVAL_US 1000*1000 +#define WARMUP_THREADS 10 using ::curve::common::Atomic; using ::curve::common::InterruptibleSleeper; @@ -74,7 +77,11 @@ using curvefs::common::is_aligned; const uint32_t kMaxHostNameLength = 255u; using mds::Mountpoint; - +typedef struct WarmUpFileContext { + fuse_ino_t inode; + uint64_t fileLen; + bool exist; +} WarmUpFileContext_t; class FuseClient { public: FuseClient() @@ -104,6 +111,7 @@ class FuseClient { mdsBase_(nullptr), isStop_(true), init_(false), + mounted_(false), enableSumInDir_(false) {} virtual CURVEFS_ERROR Init(const FuseClientOption &option); @@ -230,6 +238,11 @@ class FuseClient { init_ = true; } + TaskThreadPool & + GetTaskFetchPool() { + return taskFetchMetaPool_; + } + std::shared_ptr GetFsInfo() { return fsInfo_; } @@ -244,7 +257,27 @@ class FuseClient { void SetEnableSumInDir(bool enable) { enableSumInDir_ = enable; } + std::list& GetReadAheadFiles() { + std::unique_lock lck(fetchMtx_); + return readAheadFiles_; + } + void GetWarmUpFile(WarmUpFileContext_t& warmUpFile) { + std::unique_lock lck(warmUpFileMtx_); + warmUpFile = std::move(warmUpFile_); + warmUpFile_.exist = false; + return; + } + void SetWarmUpFile(WarmUpFileContext_t warmUpFile) { + std::unique_lock lck(warmUpFileMtx_); + warmUpFile_ = warmUpFile; + warmUpFile_.exist = true; + } + bool hasWarmUpTask() { + std::unique_lock lck(warmUpFileMtx_); + return warmUpFile_.exist; + } + void FetchDentryEnqueue(std::string file); protected: CURVEFS_ERROR MakeNode(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, FsFileType type, dev_t rdev, @@ -279,6 +312,24 @@ class FuseClient { CURVEFS_ERROR UpdateParentInodeMCTimeAndInvalidNlink( fuse_ino_t parent, FsFileType type); + void BackGroundCmd(); + void BackGroundCmdTask(); + + void WarmUpRun() { + std::lock_guard lk(mtx_); + runned_ = true; + cond_.notify_one(); + } + void WaitWarmUp() { + std::unique_lock lk(mtx_); + cond_.wait(lk, [this]() { return runned_; }); + runned_ = false; + } + std::mutex mtx_; + std::condition_variable cond_; + bool runned_ = false; + + protected: // mds client std::shared_ptr mdsClient_; @@ -308,6 +359,8 @@ class FuseClient { // init flags bool init_; + std::atomic mounted_; + // enable record summary info in dir inode xattr bool enableSumInDir_; @@ -321,6 +374,46 @@ class FuseClient { Atomic isStop_; curve::common::Mutex renameMutex_; + + Thread bgCmdThread_; + std::atomic bgCmdStop_; + + Thread bgCmdTaskThread_; + std::atomic bgCmdTaskStop_; + std::mutex cmdMtx_; + + void FetchChildDentryEnqueue(fuse_ino_t ino); + void FetchChildDentry(fuse_ino_t ino); + void FetchDentry(fuse_ino_t ino, std::string file); + void LookPath(std::string file); + TaskThreadPool + taskFetchMetaPool_; + + // need warmup files + std::list readAheadFiles_; + std::mutex fetchMtx_; + + //one warmup file provided by the user + WarmUpFileContext_t warmUpFile_; + std::mutex warmUpFileMtx_; + + std::list warmUpTasks_; // todo: need size control ? + std::mutex warmUpTaskMtx_; + void PutWarmTask(const std::string& warmUpTask) { + std::unique_lock lck(warmUpTaskMtx_); + warmUpTasks_.push_back(warmUpTask); + } + void GetarmTask(std::string &warmUpTask) { + std::unique_lock lck(warmUpTaskMtx_); + if (warmUpTasks_.empty()) + return; + warmUpTask = std::move(warmUpTasks_.front()); + warmUpTasks_.pop_front(); + } + bool hasWarmTask() { + std::unique_lock lck(warmUpTaskMtx_); + return !warmUpTasks_.empty(); + } }; } // namespace client diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index da34bcde42..89bf51a28e 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -75,11 +75,240 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { inodeManager_, mdsClient_, fsCacheManager, nullptr, true); } - + if (s3Adaptor_->HasDiskCache()) { + bgFetchStop_.store(false, std::memory_order_release); + bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this); + GetTaskFetchPool(); + } return ret; } +void FuseS3Client::BackGroundFetch() { + while (!bgFetchStop_.load(std::memory_order_acquire)) { + VLOG(0) << "BackGroundFetch."; + if (hasWarmUpTask()) { // new warmup task + WarmUpFileContext_t warmUpFile; + GetWarmUpFile(warmUpFile); + VLOG(0) << " len is: " << warmUpFile.fileLen + << "ino is: " << warmUpFile.inode; + struct fuse_file_info fi; + fi.flags &= ~O_DIRECT; + size_t rSize = 0; + std::unique_ptr data(new char[warmUpFile.fileLen+1]); + std::memset(data.get(), 0, warmUpFile.fileLen); + data[warmUpFile.fileLen] = '\n'; + FuseOpRead(nullptr, warmUpFile.inode, + warmUpFile.fileLen, 0, &fi, data.get(), &rSize); + VLOG(0) << "file is: " << data.get(); + std::string file = data.get(); + boost::trim(file); + VLOG(0) << "file is: " << file; + std::vector splitPath; + boost::split(splitPath, file, boost::is_any_of("\n"), boost::token_compress_on); + for (auto filePath : splitPath) { + VLOG(0) << "filePath is: " << filePath; + boost::trim(filePath); + FetchDentryEnqueue(filePath); + } + } + { + std::list readAheadFiles; + readAheadFiles.swap(GetReadAheadFiles()); + for (auto iter: readAheadFiles) { + VLOG(0) << "BackGroundFetch: " << iter; + fetchDataEnqueue(iter); + } + } + VLOG(0) << "BackGroundFetch end"; + usleep(WARMUP_CHECKINTERVAL_US); + } + return; +} + +void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) { + VLOG(9) << "fetchDataEnqueue start: " << ino; + auto task = [this, ino]() { + std::shared_ptr inodeWrapper; + CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "inodeManager get inode fail, ret = " << ret + << ", inodeid = " << ino; + return; + } + google::protobuf::Map *s3ChunkInfoMap = nullptr; + { + ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + s3ChunkInfoMap = inodeWrapper->GetChunkInfoMap(); + } + if (nullptr == s3ChunkInfoMap || + s3ChunkInfoMap->empty()) { + return; + } + travelChunks(ino, s3ChunkInfoMap); + }; + GetTaskFetchPool().Enqueue(task); +} + +void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, + std::list> &prefetchObjs) { + uint64_t blockSize = s3Adaptor_->GetBlockSize(); + uint64_t chunkSize = s3Adaptor_->GetChunkSize(); + uint64_t offset, len, chunkid, compaction; + for (size_t i = 0; i < chunkInfo.s3chunks_size(); i++) { + auto chunkinfo = chunkInfo.mutable_s3chunks(i); + auto fsId = fsInfo_->fsid(); + chunkid = chunkinfo->chunkid(); + compaction = chunkinfo->compaction(); + offset = chunkinfo->offset(); + len = chunkinfo->len(); + // the offset in the chunk + uint64_t chunkPos = offset % chunkSize; + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + // the first blockIndex + uint64_t blockIndexBegin = chunkPos / blockSize; + + if (len < blockSize) { // just one block + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs.push_back(std::make_pair(objectName, len)); + } else { + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + + // firstly, let's get the size in the first block + // then, subtract the length in the first block to obtain the remaining length + // lastly, We need to judge the last block is full or not + uint64_t firstBlockSize = (blockPos != 0) ? blockSize - blockPos : blockSize; + uint64_t leftSize = len - firstBlockSize; + uint32_t blockCounts = (leftSize % blockSize == 0) ? + (leftSize / blockSize + 1) : (leftSize / blockSize + 1 + 1); + // so we can get the last blockIndex because the bolck Index is cumulative + uint64_t blockIndexEnd = blockIndexBegin + blockCounts - 1; + + // the size of the last block + uint64_t lastBlockSize = leftSize % blockSize; + // whether the first block or the last block is full or not + bool firstBlockFull = (blockPos == 0) ? true : false; + bool lastBlockFull = (lastBlockSize == 0) ? true : false; + // the start and end block Index that need travel + uint64_t travelStartIndex, travelEndIndex; + // if the block is full, the size is needed download of the obj is blockSize. + // Otherwise, the value is special. + if (!firstBlockFull) { + travelStartIndex = blockIndexBegin + 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs.push_back(std::make_pair(objectName, firstBlockSize)); + } else { + travelStartIndex = blockIndexBegin; + } + if (!lastBlockFull) { + // block index is greater than or equal to 0 + // 这里blockIndexEnd不可能为0 + travelEndIndex = (blockIndexEnd == blockIndexBegin) ? blockIndexEnd : blockIndexEnd - 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexEnd, compaction, fsId, ino); + // there is no need to care about the order in which objects are downloaded + prefetchObjs.push_back(std::make_pair(objectName, lastBlockSize)); + } else { + travelEndIndex = blockIndexEnd; + } + VLOG(0) << "travel obj, ino: " << ino + << ", chunkid: " << chunkid + << ", blockCounts: " << blockCounts + << ", compaction: " << compaction + << ", blockSize: " << blockSize + << ", chunkSize: " << chunkSize + << ", offset: " << offset + << ", blockIndexBegin: " << blockIndexBegin + << ", blockIndexEnd: " << blockIndexEnd + << ", len: " << len + << ", firstBlockSize: " << firstBlockSize + << ", lastBlockSize: " << lastBlockSize + << ", blockPos: " << blockPos + << ", chunkPos: " << chunkPos; + for (auto blockIndex = travelStartIndex; blockIndex <= travelEndIndex ; blockIndex++) { + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndex, compaction, fsId, ino); + prefetchObjs.push_back(std::make_pair(objectName, blockSize)); + } + } + } +} + +void FuseS3Client::WarmUpAllObjs( + std::list> &prefetchObjs) { + std::atomic pendingReq(0); + curve::common::CountDownEvent cond(1); + // callback function + GetObjectAsyncCallBack cb = + [&](const S3Adapter *adapter, + const std::shared_ptr &context) { + if (context->retCode == 0) { + VLOG(9) << "Get Object success: " << context->key; + int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(context->key, context->buf, context->len); + if (ret < 0) { + LOG_EVERY_SECOND(INFO) << + "write read directly failed, key: " << context->key; + } + if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { + VLOG(6) << "pendingReq is over"; + cond.Signal(); + } + delete []context->buf; + return; + } + // todo: retry + LOG(WARNING) << "Get Object failed, key: " << context->key + << ", offset: " << context->offset; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + }; + + pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst); + if (pendingReq.load(std::memory_order_seq_cst)) { + VLOG(9) << "wait for pendingReq"; + for (auto iter : prefetchObjs) { + VLOG(9) << "download start: " << iter.first; + std::string name = iter.first; + uint64_t readLen = iter.second; + if(s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { + pendingReq.fetch_sub(1); + continue; + } + char *cacheS3 = new char[readLen]; // 要判断是否生成成功把? 还要析构吧? + memset(cacheS3, 0, readLen); + auto context = std::make_shared(); + context->key = name; + context->buf = cacheS3; + context->offset = 0; + context->len = readLen; + context->cb = cb; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + } + cond.Wait(); + } +} + +void FuseS3Client::travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap) { + VLOG(9) << "travel chunk start: " << ino + << ", size: " << s3ChunkInfoMap->size(); + std::list> prefetchObjs; + for (auto &iter : *s3ChunkInfoMap) { + VLOG(9) << "travel chunk: " << iter.first; + travelChunk(ino, iter.second, prefetchObjs); + } + WarmUpAllObjs(prefetchObjs); + VLOG(9) << "travel chunks end"; + return; +} + void FuseS3Client::UnInit() { + if (s3Adaptor_->HasDiskCache()) { + bgFetchStop_.store(false, std::memory_order_release); + bgFetchThread_.join(); + } s3Adaptor_->Stop(); FuseClient::UnInit(); curve::common::S3Adapter::Shutdown(); @@ -224,7 +453,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpRead(fuse_req_t req, fuse_ino_t ino, inodeManager_->ShipToFlush(inodeWrapper); - VLOG(6) << "read end, read size = " << *rSize; + VLOG(0) << "read end, read size = " << *rSize; return ret; } diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index 1a0b2898c8..72df87b66c 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -24,13 +24,20 @@ #ifndef CURVEFS_SRC_CLIENT_FUSE_S3_CLIENT_H_ #define CURVEFS_SRC_CLIENT_FUSE_S3_CLIENT_H_ +#include #include #include "curvefs/src/client/fuse_client.h" #include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "src/common/s3_adapter.h" + + namespace curvefs { namespace client { +using curve::common::GetObjectAsyncContext; +using curve::common::GetObjectAsyncCallBack; + class FuseS3Client : public FuseClient { public: FuseS3Client() @@ -92,6 +99,19 @@ class FuseS3Client : public FuseClient { private: // s3 adaptor std::shared_ptr s3Adaptor_; + + + Thread bgFetchThread_; + std::atomic bgFetchStop_; + std::mutex fetchMtx_; + void BackGroundFetch(); + void fetchDataEnqueue(fuse_ino_t ino); + void travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap); + void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, + std::list> &prefetchObjs); + void WarmUpAllObjs(std::list> &prefetchObjs); }; diff --git a/curvefs/src/client/inode_wrapper.h b/curvefs/src/client/inode_wrapper.h index 49aa50364b..814ae6b7eb 100644 --- a/curvefs/src/client/inode_wrapper.h +++ b/curvefs/src/client/inode_wrapper.h @@ -345,6 +345,10 @@ class InodeWrapper : public std::enable_shared_from_this { UpdateS3ChunkInfoMetric(2); } + google::protobuf::Map* GetChunkInfoMap() { + return inode_.mutable_s3chunkinfomap(); + } + void MarkInodeError() { // TODO(xuchaojie) : when inode is marked error, prevent futher write. status_ = InodeStatus::Error; diff --git a/curvefs/src/client/s3/client_s3_adaptor.h b/curvefs/src/client/s3/client_s3_adaptor.h index 7679566847..00d83e50a2 100644 --- a/curvefs/src/client/s3/client_s3_adaptor.h +++ b/curvefs/src/client/s3/client_s3_adaptor.h @@ -92,6 +92,19 @@ class S3ClientAdaptor { virtual void InitMetrics(const std::string &fsName) = 0; virtual void CollectMetrics(InterfaceMetric *interface, int count, uint64_t start) = 0; + virtual std::shared_ptr GetDiskCacheManager() { + return nullptr; + } + virtual std::shared_ptr GetS3Client() { + return nullptr; + } + virtual uint64_t GetBlockSize() { + return 0; + } + virtual uint64_t GetChunkSize() { + return 0; + } + virtual bool HasDiskCache() {return false;}; }; using FlushChunkCacheCallBack = std::function<