diff --git a/curvefs/src/client/common/common.cpp b/curvefs/src/client/common/common.cpp index 7cf6bfeb21..6567792c0c 100644 --- a/curvefs/src/client/common/common.cpp +++ b/curvefs/src/client/common/common.cpp @@ -105,6 +105,19 @@ WarmupType GetWarmupType(const std::string& type) { return ret; } +const char kCurveFsWarmupStorageDisk[] = "disk"; +const char kCurveFsWarmupStorageKvclient[] = "kvclient"; + +WarmupStorageType GetWarmupStorageType(const std::string &type) { + auto ret = WarmupStorageType::kWarmupStorageTypeUnknown; + if (type == kCurveFsWarmupStorageDisk) { + ret = WarmupStorageType::kWarmupStorageTypeDisk; + } else if (type == kCurveFsWarmupStorageKvclient) { + ret = WarmupStorageType::kWarmupStorageTypeKvClient; + } + return ret; +} + using ::curve::common::StringToUll; // if direction is true means '+', false means '-' diff --git a/curvefs/src/client/common/common.h b/curvefs/src/client/common/common.h index e89f50fd9a..a1a1d704c2 100644 --- a/curvefs/src/client/common/common.h +++ b/curvefs/src/client/common/common.h @@ -69,6 +69,9 @@ const uint32_t MAX_XATTR_VALUE_LENGTH = 64 * 1024; const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op"; + +constexpr int kWarmupOpNum = 4; + enum class WarmupOpType { kWarmupOpUnknown = 0, kWarmupOpAdd = 1, @@ -83,7 +86,15 @@ enum class WarmupType { kWarmupTypeSingle = 2, }; -WarmupType GetWarmupType(const std::string& type); +WarmupType GetWarmupType(const std::string &type); + +enum class WarmupStorageType { + kWarmupStorageTypeUnknown = 0, + kWarmupStorageTypeDisk = 1, + kWarmupStorageTypeKvClient = 2, +}; + +WarmupStorageType GetWarmupStorageType(const std::string &type); enum class FileHandle : uint64_t { kDefaultValue = 0, diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index 3a4920a082..d907ce5866 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -286,15 +286,16 @@ void FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) { } int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key, - const std::string &path) { + const std::string &path, + curvefs::client::common::WarmupStorageType storageType) { int ret = 0; bool result = true; switch (type) { case curvefs::client::common::WarmupType::kWarmupTypeList: - result = g_ClientInstance->PutWarmFilelistTask(key); + result = g_ClientInstance->PutWarmFilelistTask(key, storageType); break; case curvefs::client::common::WarmupType::kWarmupTypeSingle: - result = g_ClientInstance->PutWarmFileTask(key, path); + result = g_ClientInstance->PutWarmFileTask(key, path, storageType); break; default: // not support add warmup type (warmup single file/dir or filelist) @@ -328,16 +329,23 @@ int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) { std::vector opTypePath; curve::common::SplitString(value, "\n", &opTypePath); - if (opTypePath.size() != 3) { + if (opTypePath.size() != curvefs::client::common::kWarmupOpNum) { LOG(ERROR) << name << " has invalid xattr value " << value; return ERANGE; } + auto storageType = + curvefs::client::common::GetWarmupStorageType(opTypePath[3]); + if (storageType == + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeUnknown) { + LOG(ERROR) << name << " not support storage type: " << value; + return ERANGE; + } int ret = 0; switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) { case curvefs::client::common::WarmupOpType::kWarmupOpAdd: ret = AddWarmupTask(curvefs::client::common::GetWarmupType(opTypePath[1]), - key, opTypePath[2]); + key, opTypePath[2], storageType); if (ret != 0) { LOG(ERROR) << name << " has invalid xattr value " << value; } diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h index 1c66150b97..c949183d0f 100644 --- a/curvefs/src/client/fuse_client.h +++ b/curvefs/src/client/fuse_client.h @@ -267,16 +267,17 @@ class FuseClient { enableSumInDir_ = enable; } - bool PutWarmFilelistTask(fuse_ino_t key) { + bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type) { if (fsInfo_->fstype() == FSType::TYPE_S3) { - return warmupManager_->AddWarmupFilelist(key); + return warmupManager_->AddWarmupFilelist(key, type); } // only support s3 return true; } - bool PutWarmFileTask(fuse_ino_t key, const std::string& path) { + bool PutWarmFileTask(fuse_ino_t key, const std::string &path, + common::WarmupStorageType type) { if (fsInfo_->fstype() == FSType::TYPE_S3) { - return warmupManager_->AddWarmupFile(key, path); + return warmupManager_->AddWarmupFile(key, path, type); } // only support s3 return true; } diff --git a/curvefs/src/client/warmup/warmup_manager.cpp b/curvefs/src/client/warmup/warmup_manager.cpp index e3faf21788..5aa802d4d5 100644 --- a/curvefs/src/client/warmup/warmup_manager.cpp +++ b/curvefs/src/client/warmup/warmup_manager.cpp @@ -32,6 +32,7 @@ #include #include +#include "curvefs/src/client/common/common.h" #include "curvefs/src/client/inode_wrapper.h" #include "curvefs/src/client/kvclient/kvclient_manager.h" #include "curvefs/src/client/s3/client_s3_cache_manager.h" @@ -48,16 +49,17 @@ using curve::common::WriteLockGuard; #define WARMUP_CHECKINTERVAL_US (1000 * 1000) -bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key) { +bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key, + WarmupStorageType type) { if (!mounted_.load(std::memory_order_acquire)) { LOG(ERROR) << "not mounted"; return false; } // add warmup Progress - if (AddWarmupProcess(key)) { + if (AddWarmupProcess(key, type)) { VLOG(9) << "add warmup list task:" << key; WriteLockGuard lock(warmupFilelistDequeMutex_); - auto iter = FindKeyWarmupFilelistLocked(key); + auto iter = FindWarmupFilelistByKeyLocked(key); if (iter == warmupFilelistDeque_.end()) { std::shared_ptr inodeWrapper; CURVEFS_ERROR ret = inodeManager_->GetInode(key, inodeWrapper); @@ -73,14 +75,14 @@ bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key) { return true; } -bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, - const std::string &path) { +bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string &path, + WarmupStorageType type) { if (!mounted_.load(std::memory_order_acquire)) { LOG(ERROR) << "not mounted"; return false; } // add warmup Progress - if (AddWarmupProcess(key)) { + if (AddWarmupProcess(key, type)) { VLOG(9) << "add warmup single task:" << key; FetchDentryEnqueue(key, path); } @@ -246,7 +248,7 @@ void WarmupManagerS3Impl::FetchDentry(fuse_ino_t key, fuse_ino_t ino, } if (FsFileType::TYPE_S3 == dentry.type()) { WriteLockGuard lock(warmupInodesDequeMutex_); - auto iterDeque = FindKeyWarmupInodesLocked(key); + auto iterDeque = FindWarmupInodesByKeyLocked(key); if (iterDeque == warmupInodesDeque_.end()) { warmupInodesDeque_.emplace_back( key, std::set{dentry.inodeid()}); @@ -286,7 +288,7 @@ void WarmupManagerS3Impl::FetchChildDentry(fuse_ino_t key, fuse_ino_t ino) { << " dentry: " << dentry.name(); if (FsFileType::TYPE_S3 == dentry.type()) { WriteLockGuard lock(warmupInodesDequeMutex_); - auto iterDeque = FindKeyWarmupInodesLocked(key); + auto iterDeque = FindWarmupInodesByKeyLocked(key); if (iterDeque == warmupInodesDeque_.end()) { warmupInodesDeque_.emplace_back( key, std::set{dentry.inodeid()}); @@ -342,7 +344,7 @@ void WarmupManagerS3Impl::TravelChunks( TravelChunk(ino, infoIter.second, &prefetchObjs); { ReadLockGuard lock(inode2ProgressMutex_); - auto iter = FindKeyWarmupProgressLocked(key); + auto iter = FindWarmupProgressByKeyLocked(key); if (iter != inode2Progress_.end()) { iter->second.AddTotal(prefetchObjs.size()); } else { @@ -470,17 +472,7 @@ void WarmupManagerS3Impl::WarmUpAllObjs( } if (context->retCode == 0) { VLOG(9) << "Get Object success: " << context->key; - { - // update progress - ReadLockGuard lock(inode2ProgressMutex_); - auto iter = FindKeyWarmupProgressLocked(key); - if (iter != inode2Progress_.end()) { - iter->second.FinishedPlusOne(); - } else { - VLOG(9) << "no such warmup progress: " << key; - } - } - PutObjectToCache(context->key, context->buf, context->len); + PutObjectToCache(key, context->key, context->buf, context->len); CollectMetrics(&warmupS3Metric_.warmupS3Cached, context->len, start); warmupS3Metric_.warmupS3CacheSize << context->len; @@ -515,9 +507,17 @@ void WarmupManagerS3Impl::WarmUpAllObjs( VLOG(9) << "download start: " << iter.first; std::string name = iter.first; uint64_t readLen = iter.second; - if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { - pendingReq.fetch_sub(1); - continue; + { + ReadLockGuard lock(inode2ProgressMutex_); + auto iterProgress = FindWarmupProgressByKeyLocked(key); + if (iterProgress->second.GetStorageType() == + curvefs::client::common::WarmupStorageType:: + kWarmupStorageTypeDisk && + s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { + // storage in disk and has cached + pendingReq.fetch_sub(1); + continue; + } } char *cacheS3 = new char[readLen]; memset(cacheS3, 0, readLen); @@ -539,25 +539,26 @@ bool WarmupManagerS3Impl::ProgressDone(fuse_ino_t key) { bool ret; { ReadLockGuard lockList(warmupFilelistDequeMutex_); - ret = FindKeyWarmupFilelistLocked(key) == warmupFilelistDeque_.end(); + ret = + FindWarmupFilelistByKeyLocked(key) == warmupFilelistDeque_.end(); } { ReadLockGuard lockDentry(inode2FetchDentryPoolMutex_); - ret = ret && (FindKeyFetchDentryPoolLocked(key) == + ret = ret && (FindFetchDentryPoolByKeyLocked(key) == inode2FetchDentryPool_.end()); } { ReadLockGuard lockInodes(warmupInodesDequeMutex_); - ret = - ret && (FindKeyWarmupInodesLocked(key) == warmupInodesDeque_.end()); + ret = ret && + (FindWarmupInodesByKeyLocked(key) == warmupInodesDeque_.end()); } { ReadLockGuard lockS3Objects(inode2FetchS3ObjectsPoolMutex_); - ret = ret && (FindKeyFetchS3ObjectsPoolLocked(key) == + ret = ret && (FindFetchS3ObjectsPoolByKeyLocked(key) == inode2FetchS3ObjectsPool_.end()); } return ret; @@ -676,18 +677,35 @@ void WarmupManagerS3Impl::AddFetchS3objectsTask(fuse_ino_t key, } } -void WarmupManagerS3Impl::PutObjectToCache(const std::string &filename, +void WarmupManagerS3Impl::PutObjectToCache(fuse_ino_t key, + const std::string &filename, const char *data, uint64_t len) { - int ret = - s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(filename, data, len); - if (ret < 0) { - LOG_EVERY_SECOND(INFO) - << "write read directly failed, key: " << filename; + ReadLockGuard lock(inode2ProgressMutex_); + auto iter = FindWarmupProgressByKeyLocked(key); + if (iter == inode2Progress_.end()) { + VLOG(9) << "no this warmup task progress: " << key; + return; } - - if (kvClientManager_ != nullptr) { - kvClientManager_->Set( - std::make_shared(filename, data, len)); + int ret; + // update progress + iter->second.FinishedPlusOne(); + switch (iter->second.GetStorageType()) { + case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk: + ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(filename, data, + len); + if (ret < 0) { + LOG_EVERY_SECOND(INFO) + << "write read directly failed, key: " << filename; + } + break; + case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeKvClient: + if (kvClientManager_ != nullptr) { + kvClientManager_->Set( + std::make_shared(filename, data, len)); + } + break; + default: + LOG_EVERY_N(ERROR, 1000) << "unsupported warmup storage type"; } } diff --git a/curvefs/src/client/warmup/warmup_manager.h b/curvefs/src/client/warmup/warmup_manager.h index 33739b59f4..bb1f80e474 100644 --- a/curvefs/src/client/warmup/warmup_manager.h +++ b/curvefs/src/client/warmup/warmup_manager.h @@ -39,6 +39,7 @@ #include #include +#include "curvefs/src/client/common/common.h" #include "curvefs/src/client/dentry_cache_manager.h" #include "curvefs/src/client/fuse_common.h" #include "curvefs/src/client/inode_cache_manager.h" @@ -61,6 +62,8 @@ using ThreadPool = curvefs::common::TaskThreadPool2; using curve::common::BthreadRWLock; +using curvefs::client::common::WarmupStorageType; + class WarmupFile { public: explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0) @@ -104,11 +107,13 @@ using FuseOpReadFunctionType = class WarmupProgress { public: - explicit WarmupProgress(uint64_t total = 0, uint64_t finished = 0) - : total_(total), finished_(finished) {} + explicit WarmupProgress(WarmupStorageType type = curvefs::client::common:: + WarmupStorageType::kWarmupStorageTypeUnknown) + : total_(0), finished_(0), storageType_(type) {} WarmupProgress(const WarmupProgress &wp) - : total_(wp.total_), finished_(wp.finished_) {} + : total_(wp.total_), finished_(wp.finished_), + storageType_(wp.storageType_) {} void AddTotal(uint64_t add) { std::lock_guard lock(totalMutex_); @@ -143,11 +148,16 @@ class WarmupProgress { ",finished:" + std::to_string(finished_); } + WarmupStorageType GetStorageType() { + return storageType_; + } + private: uint64_t total_; std::mutex totalMutex_; uint64_t finished_; std::mutex finishedMutex_; + WarmupStorageType storageType_; }; class WarmupManager { @@ -178,8 +188,9 @@ class WarmupManager { } virtual void UnInit() { ClearWarmupProcess(); } - virtual bool AddWarmupFilelist(fuse_ino_t key) = 0; - virtual bool AddWarmupFile(fuse_ino_t key, const std::string &path) = 0; + virtual bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type) = 0; + virtual bool AddWarmupFile(fuse_ino_t key, const std::string &path, + WarmupStorageType type) = 0; void SetMounted(bool mounted) { mounted_.store(mounted, std::memory_order_release); @@ -206,7 +217,7 @@ class WarmupManager { bool QueryWarmupProgress(fuse_ino_t key, WarmupProgress *progress) { bool ret = true; ReadLockGuard lock(inode2ProgressMutex_); - auto iter = FindKeyWarmupProgressLocked(key); + auto iter = FindWarmupProgressByKeyLocked(key); if (iter != inode2Progress_.end()) { *progress = iter->second; } else { @@ -224,9 +235,9 @@ class WarmupManager { * @return true * @return false warmupProcess has been added */ - virtual bool AddWarmupProcess(fuse_ino_t key) { + virtual bool AddWarmupProcess(fuse_ino_t key, WarmupStorageType type) { WriteLockGuard lock(inode2ProgressMutex_); - auto ret = inode2Progress_.emplace(key, WarmupProgress()); + auto ret = inode2Progress_.emplace(key, WarmupProgress(type)); return ret.second; } @@ -237,7 +248,7 @@ class WarmupManager { * @return std::unordered_map::iterator */ std::unordered_map::iterator - FindKeyWarmupProgressLocked(fuse_ino_t key) { + FindWarmupProgressByKeyLocked(fuse_ino_t key) { return inode2Progress_.find(key); } @@ -287,8 +298,9 @@ class WarmupManagerS3Impl : public WarmupManager { std::move(readFunc), std::move(kvClientManager)), s3Adaptor_(std::move(s3Adaptor)) {} - bool AddWarmupFilelist(fuse_ino_t key) override; - bool AddWarmupFile(fuse_ino_t key, const std::string &path) override; + bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type) override; + bool AddWarmupFile(fuse_ino_t key, const std::string &path, + WarmupStorageType type) override; void Init(const FuseClientOption &option) override; void UnInit() override; @@ -314,7 +326,7 @@ class WarmupManagerS3Impl : public WarmupManager { * @return std::deque::iterator */ std::deque::iterator - FindKeyWarmupInodesLocked(fuse_ino_t key) { + FindWarmupInodesByKeyLocked(fuse_ino_t key) { return std::find_if(warmupInodesDeque_.begin(), warmupInodesDeque_.end(), [key](const WarmupInodes &inodes) { @@ -329,7 +341,7 @@ class WarmupManagerS3Impl : public WarmupManager { * @return std::deque::iterator */ std::deque::iterator - FindKeyWarmupFilelistLocked(fuse_ino_t key) { + FindWarmupFilelistByKeyLocked(fuse_ino_t key) { return std::find_if(warmupFilelistDeque_.begin(), warmupFilelistDeque_.end(), [key](const WarmupFilelist &filelist_) { @@ -345,7 +357,7 @@ class WarmupManagerS3Impl : public WarmupManager { * std::unique_ptr>::iterator */ std::unordered_map>::iterator - FindKeyFetchDentryPoolLocked(fuse_ino_t key) { + FindFetchDentryPoolByKeyLocked(fuse_ino_t key) { return inode2FetchDentryPool_.find(key); } @@ -357,7 +369,7 @@ class WarmupManagerS3Impl : public WarmupManager { * std::unique_ptr>::iterator */ std::unordered_map>::iterator - FindKeyFetchS3ObjectsPoolLocked(fuse_ino_t key) { + FindFetchS3ObjectsPoolByKeyLocked(fuse_ino_t key) { return inode2FetchS3ObjectsPool_.find(key); } @@ -401,8 +413,8 @@ class WarmupManagerS3Impl : public WarmupManager { void AddFetchS3objectsTask(fuse_ino_t key, std::function task); - void PutObjectToCache(const std::string &filename, const char *data, - uint64_t len); + void PutObjectToCache(fuse_ino_t key, const std::string &filename, + const char *data, uint64_t len); protected: std::deque warmupFilelistDeque_; diff --git a/curvefs/test/client/test_fuse_s3_client.cpp b/curvefs/test/client/test_fuse_s3_client.cpp index 9b2816a5e8..3fa444bc9c 100644 --- a/curvefs/test/client/test_fuse_s3_client.cpp +++ b/curvefs/test/client/test_fuse_s3_client.cpp @@ -225,7 +225,9 @@ TEST_F(TestFuseS3Client, warmUp_inodeBadFd) { Return(CURVEFS_ERROR::BAD_FD))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); @@ -280,7 +282,9 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) { auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -336,7 +340,9 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -392,7 +398,9 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -448,7 +456,9 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -509,7 +519,9 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -572,7 +584,9 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -634,7 +648,9 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -683,7 +699,9 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_unkown) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -738,7 +756,9 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); @@ -824,7 +844,9 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) { DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); auto old = client_->GetFsInfo()->fstype(); client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); - client_->PutWarmFilelistTask(inodeid); + client_->PutWarmFilelistTask( + inodeid, + curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk); warmup::WarmupProgress progress; bool ret = client_->GetWarmupProgress(inodeid, &progress); diff --git a/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go b/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go index 60edfa097b..539d839a13 100644 --- a/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go +++ b/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go @@ -48,16 +48,22 @@ $ curve fs warmup add /mnt/warmup # warmup all files in /mnt/warmup` const ( CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.op" - CURVEFS_WARMUP_OP_ADD_SINGLE = "add\nsingle\n%s" - CURVEFS_WARMUP_OP_ADD_LIST = "add\nlist\n%s" + CURVEFS_WARMUP_OP_ADD_SINGLE = "add\nsingle\n%s\n%s" + CURVEFS_WARMUP_OP_ADD_LIST = "add\nlist\n%s\n%s" ) +var STORAGE_TYPE = map[string]string{ + "disk": "disk", + "mem": "kvclient", +} + type AddCommand struct { basecmd.FinalCurveCmd Mountpoint *mountinfo.MountInfo Path string // path in user system - CurvefsPath string // path in curvefs - Single bool // warmup a single file or directory + CurvefsPath string // path in curvefs + Single bool // warmup a single file or directory + StorageType string // warmup storage type ConvertFails []string } @@ -82,6 +88,7 @@ func NewAddCommand() *cobra.Command { func (aCmd *AddCommand) AddFlags() { config.AddFileListOptionFlag(aCmd.Cmd) config.AddDaemonOptionPFlag(aCmd.Cmd) + config.AddStorageOptionFlag(aCmd.Cmd) } func (aCmd *AddCommand) Init(cmd *cobra.Command, args []string) error { @@ -122,18 +129,18 @@ func (aCmd *AddCommand) Init(cmd *cobra.Command, args []string) error { aCmd.Mountpoint = nil for _, mountpoint := range mountpoints { absPath, _ := filepath.Abs(aCmd.Path) - rel , err := filepath.Rel(mountpoint.MountPoint, absPath) + rel, err := filepath.Rel(mountpoint.MountPoint, absPath) if err == nil && !strings.HasPrefix(rel, "..") { // found the mountpoint if aCmd.Mountpoint == nil || len(aCmd.Mountpoint.MountPoint) < len(mountpoint.MountPoint) { - // Prevent the curvefs directory from being mounted under the curvefs directory - // /a/b/c: - // test-1 mount in /a - // test-1 mount in /a/b - // warmup /a/b/c. - aCmd.Mountpoint = mountpoint - aCmd.CurvefsPath = cobrautil.Path2CurvefsPath(aCmd.Path, mountpoint) + // Prevent the curvefs directory from being mounted under the curvefs directory + // /a/b/c: + // test-1 mount in /a + // test-1 mount in /a/b + // warmup /a/b/c. + aCmd.Mountpoint = mountpoint + aCmd.CurvefsPath = cobrautil.Path2CurvefsPath(aCmd.Path, mountpoint) } } } @@ -141,6 +148,12 @@ func (aCmd *AddCommand) Init(cmd *cobra.Command, args []string) error { return fmt.Errorf("[%s] is not saved in curvefs", aCmd.Path) } + // check storage type + aCmd.StorageType = STORAGE_TYPE[config.GetStorageFlag(aCmd.Cmd)] + if aCmd.StorageType == "" { + return fmt.Errorf("[%s] is not support storage type", aCmd.StorageType) + } + return nil } @@ -185,7 +198,7 @@ func (aCmd *AddCommand) RunCommand(cmd *cobra.Command, args []string) error { } xattr = CURVEFS_WARMUP_OP_ADD_LIST } - value := fmt.Sprintf(xattr, aCmd.CurvefsPath) + value := fmt.Sprintf(xattr, aCmd.CurvefsPath, aCmd.StorageType) err := unix.Setxattr(aCmd.Path, CURVEFS_WARMUP_OP_XATTR, []byte(value), 0) if err == unix.ENOTSUP || err == unix.EOPNOTSUPP { return fmt.Errorf("filesystem does not support extended attributes") diff --git a/tools-v2/pkg/config/fs.go b/tools-v2/pkg/config/fs.go index 3e6404834c..232c743b95 100644 --- a/tools-v2/pkg/config/fs.go +++ b/tools-v2/pkg/config/fs.go @@ -93,6 +93,9 @@ const ( CURVEFS_DAEMON = "daemon" VIPER_CURVEFS_DAEMON = "curvefs.daemon" CURVEFS_DEFAULT_DAEMON = false + CURVEFS_STORAGE = "storage" + VIPER_CURVEFS_STORAGE = "curvefs.storage" + CURVEFS_DEFAULT_STORAGE = "disk" // S3 CURVEFS_S3_AK = "s3.ak" @@ -168,7 +171,8 @@ var ( CURVEFS_SERVERS: VIPER_CURVEFS_SERVERS, CURVEFS_FILELIST: VIPER_CURVEFS_FILELIST, CURVEFS_INTERVAL: VIPER_CURVEFS_INTERVAL, - CURVEFS_DAEMON: VIPER_CURVEFS_DAEMON, + CURVEFS_DAEMON: VIPER_CURVEFS_DAEMON, + CURVEFS_STORAGE: VIPER_CURVEFS_STORAGE, // S3 CURVEFS_S3_AK: VIPER_CURVEFS_S3_AK, @@ -197,7 +201,8 @@ var ( CURVEFS_MARGIN: CURVEFS_DEFAULT_MARGIN, CURVEFS_SERVERS: CURVEFS_DEFAULT_SERVERS, CURVEFS_INTERVAL: CURVEFS_DEFAULT_INTERVAL, - CURVEFS_DAEMON: CURVEFS_DEFAULT_DAEMON, + CURVEFS_DAEMON: CURVEFS_DEFAULT_DAEMON, + CURVEFS_STORAGE: CURVEFS_DEFAULT_STORAGE, // S3 CURVEFS_S3_AK: CURVEFS_DEFAULT_S3_AK, @@ -758,6 +763,15 @@ func GetDaemonFlag(cmd *cobra.Command) bool { return GetFlagBool(cmd, CURVEFS_DAEMON) } +// storage [option] +func AddStorageOptionFlag(cmd *cobra.Command) { + AddStringOptionFlag(cmd, CURVEFS_STORAGE, "warmup storage type, can be: disk/mem") +} + +func GetStorageFlag(cmd *cobra.Command) string { + return GetFlagString(cmd, CURVEFS_STORAGE) +} + /* required */ // copysetid [required]