diff --git a/curvefs/src/client/s3/disk_cache_manager.cpp b/curvefs/src/client/s3/disk_cache_manager.cpp index 61a6adf2f0..2b86177491 100644 --- a/curvefs/src/client/s3/disk_cache_manager.cpp +++ b/curvefs/src/client/s3/disk_cache_manager.cpp @@ -64,6 +64,9 @@ DiskCacheManager::DiskCacheManager(std::shared_ptr posixWrapper, fullRatio_ = 0; safeRatio_ = 0; maxUsableSpaceBytes_ = 0; + cachedObjName_ = std::make_shared< + SglLRUCache>(0, + std::make_shared("diskcache")); } int DiskCacheManager::Init(S3Client *client, @@ -93,12 +96,14 @@ int DiskCacheManager::Init(S3Client *client, std::thread uploadThread = std::thread(&DiskCacheManager::UploadAllCacheWriteFile, this); uploadThread.detach(); + // load all cache read file - ret = cacheRead_->LoadAllCacheReadFile(&cachedObjName_); + ret = cacheRead_->LoadAllCacheReadFile(cachedObjName_); if (ret < 0) { LOG(ERROR) << "load all cache read file error. ret = " << ret; return ret; } + // start trim thread TrimRun(); @@ -145,15 +150,12 @@ int DiskCacheManager::UploadAllCacheWriteFile() { } void DiskCacheManager::AddCache(const std::string name) { - std::lock_guard lk(mtx_); - cachedObjName_.emplace(name); + cachedObjName_->Put(name); + VLOG(9) << "cache size is: " << cachedObjName_->Size(); } bool DiskCacheManager::IsCached(const std::string name) { - std::lock_guard lk(mtx_); - std::set::iterator cacheKeyIter; - cacheKeyIter = cachedObjName_.find(name); - if (cacheKeyIter == cachedObjName_.end()) { + if (!cachedObjName_->IsCached(name)) { VLOG(9) << "not cached, name = " << name; return false; } @@ -331,23 +333,21 @@ void DiskCacheManager::TrimCache() { std::string cacheWriteFullDir; cacheReadFullDir = GetCacheReadFullDir(); cacheWriteFullDir = GetCacheWriteFullDir(); - std::set cachedObjNameTmp; - { - std::lock_guard lk(mtx_); - cachedObjNameTmp = cachedObjName_; - } + while (!IsDiskCacheSafe()) { std::string cacheReadFile, cacheWriteFile; - std::set::iterator cacheKeyIter; - cacheKeyIter = cachedObjNameTmp.begin(); - if (cacheKeyIter == cachedObjNameTmp.end()) { + if (cachedObjName_->Size() == 0) { VLOG(3) << "remove disk file error" - << ", cachedObjName is empty."; + << ", cachedObjName is empty."; break; } - - cacheReadFile = cacheReadFullDir + "/" + *cacheKeyIter; - cacheWriteFile = cacheWriteFullDir + "/" + *cacheKeyIter; + std::string cacheKey; + cachedObjName_->GetBack(&cacheKey); + if (cacheKey.empty()) { + break; + } + cacheReadFile = cacheReadFullDir + "/" + cacheKey; + cacheWriteFile = cacheWriteFullDir + "/" + cacheKey; struct stat statFile; int ret; ret = posixWrapper_->stat(cacheWriteFile.c_str(), &statFile); @@ -359,28 +359,15 @@ void DiskCacheManager::TrimCache() { if (ret == 0) { VLOG(3) << "do not remove this disk file" << ", file has not been uploaded to S3." - << ", file is: " << *cacheKeyIter; - cachedObjNameTmp.erase(cacheKeyIter); - continue; - } - { - std::lock_guard lk(mtx_); - std::set::iterator iter; - iter = cachedObjName_.find(*cacheKeyIter); - if (iter == cachedObjName_.end()) { - VLOG(6) << "remove disk file error" - << ", file is not exist in cachedObjName" - << ", file is: " << *cacheKeyIter; - cachedObjNameTmp.erase(cacheKeyIter); - continue; - } - cachedObjName_.erase(iter); + << ", file is: " << cacheKey; + break; } + cachedObjName_->Remove(cacheKey); struct stat statReadFile; ret = posixWrapper_->stat(cacheReadFile.c_str(), &statReadFile); if (ret != 0) { - VLOG(3) << "remove disk file error" - << ", file is: " << *cacheKeyIter; + VLOG(3) << "stat disk file error" + << ", file is: " << cacheKey; continue; } // if remove disk file before delete cache, @@ -390,14 +377,12 @@ void DiskCacheManager::TrimCache() { ret = posixWrapper_->remove(toDelFile); if (ret < 0) { LOG(ERROR) - << "remove disk file error, file is: " << *cacheKeyIter; - cachedObjNameTmp.erase(cacheKeyIter); + << "remove disk file error, file is: " << cacheKey; continue; } DecDiskUsedBytes(statReadFile.st_size); VLOG(3) << "remove disk file success, file is: " - << *cacheKeyIter; - cachedObjNameTmp.erase(cacheKeyIter); + << cacheKey; } VLOG(3) << "trim over."; } diff --git a/curvefs/src/client/s3/disk_cache_manager.h b/curvefs/src/client/s3/disk_cache_manager.h index 81416ddff6..da0605eb70 100644 --- a/curvefs/src/client/s3/disk_cache_manager.h +++ b/curvefs/src/client/s3/disk_cache_manager.h @@ -24,12 +24,14 @@ #include +#include #include #include #include #include "src/common/concurrent/concurrent.h" #include "src/common/interruptible_sleeper.h" +#include "src/common/lru_cache.h" #include "src/common/throttle.h" #include "curvefs/src/common/wrap_posix.h" #include "curvefs/src/common/utils.h" @@ -40,8 +42,10 @@ namespace curvefs { namespace client { +using ::curve::common::LRUCache; using curvefs::common::PosixWrapper; using curve::common::ReadWriteThrottleParams; +using ::curve::common::SglLRUCache; using curvefs::common::SysUtils; using curve::common::ThrottleParams; using curve::common::Throttle; @@ -148,9 +152,7 @@ class DiskCacheManager { std::string cacheDir_; std::shared_ptr cacheWrite_; std::shared_ptr cacheRead_; - std::set cachedObjName_; - - bthread::Mutex mtx_; + std::shared_ptr> cachedObjName_; S3Client *client_; std::shared_ptr posixWrapper_; diff --git a/curvefs/src/client/s3/disk_cache_read.cpp b/curvefs/src/client/s3/disk_cache_read.cpp index 980def0534..28819af6ed 100644 --- a/curvefs/src/client/s3/disk_cache_read.cpp +++ b/curvefs/src/client/s3/disk_cache_read.cpp @@ -102,7 +102,8 @@ int DiskCacheRead::LinkWriteToRead(const std::string fileName, return 0; } -int DiskCacheRead::LoadAllCacheReadFile(std::set *cachedObj) { +int DiskCacheRead::LoadAllCacheReadFile( + std::shared_ptr> cachedObj) { LOG(INFO) << "LoadAllCacheReadFile start. "; std::string cacheReadPath; bool ret; @@ -124,7 +125,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set *cachedObj) { (!strncmp(cacheReadDirent->d_name, "..", 2))) continue; std::string fileName = cacheReadDirent->d_name; - cachedObj->emplace(fileName); + cachedObj->Put(fileName); VLOG(3) << "LoadAllCacheReadFile obj, name = " << fileName; } VLOG(6) << "close start."; @@ -133,6 +134,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set *cachedObj) { LOG(ERROR) << "opendir error, errno = " << errno; return rc; } + LOG(INFO) << "LoadAllCacheReadFile success."; return 0; } diff --git a/curvefs/src/client/s3/disk_cache_read.h b/curvefs/src/client/s3/disk_cache_read.h index 048450ef03..c55e2aaaac 100644 --- a/curvefs/src/client/s3/disk_cache_read.h +++ b/curvefs/src/client/s3/disk_cache_read.h @@ -22,18 +22,21 @@ #ifndef CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_ #define CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_ +#include #include -#include #include +#include #include "src/common/concurrent/concurrent.h" #include "src/common/interruptible_sleeper.h" +#include "src/common/lru_cache.h" #include "curvefs/src/common/wrap_posix.h" #include "curvefs/src/client/s3/disk_cache_base.h" namespace curvefs { namespace client { +using ::curve::common::SglLRUCache; using curvefs::common::PosixWrapper; class DiskCacheRead : public DiskCacheBase { @@ -53,7 +56,8 @@ class DiskCacheRead : public DiskCacheBase { /** * @brief after reboot,load all files that store in read cache. */ - virtual int LoadAllCacheReadFile(std::set* cachedObj); + virtual int LoadAllCacheReadFile(std::shared_ptr< + SglLRUCache> cachedObj); virtual void InitMetrics(std::shared_ptr metric) { metric_ = metric; } diff --git a/curvefs/test/client/test_disk_cache_manager.cpp b/curvefs/test/client/test_disk_cache_manager.cpp index 4c75f3b21b..8e3458ef70 100644 --- a/curvefs/test/client/test_disk_cache_manager.cpp +++ b/curvefs/test/client/test_disk_cache_manager.cpp @@ -261,6 +261,23 @@ TEST_F(TestDiskCacheManager, WriteDiskFile) { ASSERT_EQ(0, ret); } +TEST_F(TestDiskCacheManager, IsCached) { + std::string fileName = "test"; + std::string fileName2 = "test2"; + bool ret = diskCacheManager_->IsCached(fileName); + ASSERT_EQ(false, ret); + + diskCacheManager_->AddCache(fileName); + diskCacheManager_->AddCache(fileName2); + ret = diskCacheManager_->IsCached(fileName2); + ASSERT_EQ(true, ret); + + diskCacheManager_->AddCache(fileName); + diskCacheManager_->AddCache(fileName2); + ret = diskCacheManager_->IsCached(fileName); + ASSERT_EQ(true, ret); +} + TEST_F(TestDiskCacheManager, SetDiskFsUsedRatio) { EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull())) .WillOnce(Return(-1)); diff --git a/curvefs/test/client/test_disk_cache_read.cpp b/curvefs/test/client/test_disk_cache_read.cpp index efb7f4d634..d771d978a4 100644 --- a/curvefs/test/client/test_disk_cache_read.cpp +++ b/curvefs/test/client/test_disk_cache_read.cpp @@ -23,6 +23,7 @@ #include #include +#include "src/common/lru_cache.h" #include "curvefs/test/client/mock_test_posix_wapper.h" #include "curvefs/test/client/mock_disk_cache_base.h" #include "curvefs/src/client/s3/disk_cache_read.h" @@ -46,6 +47,9 @@ using ::testing::ElementsAre; using ::testing::SetArgPointee; using ::testing::ReturnArg; +using ::curve::common::CacheMetrics; +using ::curve::common::SglLRUCache; + class TestDiskCacheRead : public ::testing::Test { protected: TestDiskCacheRead() {} @@ -67,7 +71,6 @@ class TestDiskCacheRead : public ::testing::Test { std::shared_ptr wrapper_; }; - TEST_F(TestDiskCacheRead, ReadDiskFile) { EXPECT_CALL(*wrapper_, open(_, _, _)) .WillOnce(Return(-1)); @@ -154,15 +157,18 @@ TEST_F(TestDiskCacheRead, LinkWriteToRead) { TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) { EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull())) .WillOnce(Return(-1)); - std::set cachedObj; - int ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj); + std::shared_ptr> cachedObj; + cachedObj = std::make_shared< + SglLRUCache>(0, + std::make_shared("diskcache")); + int ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj); ASSERT_EQ(-1, ret); EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull())) .WillOnce(Return(0)); EXPECT_CALL(*wrapper_, opendir(NotNull())) .WillOnce(ReturnNull()); - ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj); + ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj); ASSERT_EQ(-1, ret); DIR* dir = opendir("."); @@ -174,7 +180,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) { .WillOnce(Return(0)); EXPECT_CALL(*wrapper_, readdir(NotNull())) .WillOnce(ReturnNull()); - ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj); + ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj); ASSERT_EQ(0, ret); struct dirent* dirent; @@ -190,7 +196,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) { .WillOnce(ReturnNull()); EXPECT_CALL(*wrapper_, closedir(NotNull())) .WillOnce(Return(0)); - ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj); + ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj); ASSERT_EQ(0, ret); } diff --git a/src/common/lru_cache.h b/src/common/lru_cache.h index e185b37f94..c44affaf8c 100644 --- a/src/common/lru_cache.h +++ b/src/common/lru_cache.h @@ -356,6 +356,197 @@ std::shared_ptr return cacheMetrics_; } +template +class SglLRUCacheInterface { + public: + /** + * @brief Store key to the cache + * + * @param[in] key + * + */ + virtual void Put(const K &key) = 0; + + virtual bool IsCached(const K &key) = 0; + + /* + * @brief Remove key from cache + * @param[in] key + */ + virtual void Remove(const K &key) = 0; + /* + * @brief Get back key from cache + * @param[out] the back key + */ + virtual void GetBack(K *value) = 0; + virtual uint64_t Size() = 0; +}; + +template > +class SglLRUCache : public SglLRUCacheInterface { + public: + explicit SglLRUCache(uint64_t maxCount, + std::shared_ptr cacheMetrics = nullptr) + : maxCount_(maxCount), + cacheMetrics_(cacheMetrics) {} + + explicit SglLRUCache(std::shared_ptr cacheMetrics = nullptr) + : maxCount_(0), + cacheMetrics_(cacheMetrics) {} + + void Put(const K &key) override; + + bool IsCached(const K &key) override; + + void Remove(const K &key) override; + + void GetBack(K *value); + + uint64_t Size(); + + std::shared_ptr GetCacheMetrics() const; + + private: + bool PutLocked(const K &key); + + void RemoveLocked(const K &key); + + void MoveToFront(const typename std::list::iterator &elem); + + bool RemoveOldest(); + + + void RemoveElement(const typename std::list::iterator &elem); + + private: + ::curve::common::RWLock lock_; + + // the maximum length of the queue. 0 indicates unlimited length + uint64_t maxCount_; + // dequeue for storing items + std::list ll_; + // record the position of the item corresponding to the key in the dequeue + std::unordered_map::iterator> cache_; + + // cache related metric data + std::shared_ptr cacheMetrics_; +}; + +template +std::shared_ptr + SglLRUCache::GetCacheMetrics() const { + return cacheMetrics_; +} + +template +uint64_t SglLRUCache::Size() { + ::curve::common::WriteLockGuard guard(lock_); + return ll_.size(); +} + +template +void SglLRUCache::Put(const K &key) { + ::curve::common::WriteLockGuard guard(lock_); + PutLocked(key); +} + +template +void SglLRUCache::GetBack(K *value) { + ::curve::common::WriteLockGuard guard(lock_); + if (ll_.empty()) { + LOG(INFO) << "cache is empty."; + return; + } + *value = ll_.back(); + return; +} + +template +bool SglLRUCache::IsCached(const K &key) { + ::curve::common::WriteLockGuard guard(lock_); + auto iter = cache_.find(key); + if (iter == cache_.end()) { + if (cacheMetrics_ != nullptr) { + cacheMetrics_->OnCacheMiss(); + } + return false; + } + + if (cacheMetrics_ != nullptr) { + cacheMetrics_->OnCacheHit(); + } + + // update the position of the target item in the list + MoveToFront(iter->second); + return true; +} + +template +void SglLRUCache::Remove(const K &key) { + ::curve::common::WriteLockGuard guard(lock_); + RemoveLocked(key); +} + +template +bool SglLRUCache::PutLocked(const K &key) { + auto iter = cache_.find(key); + + // delete the old value if already exist + if (iter != cache_.end()) { + RemoveElement(iter->second); + } + + // put new value + ll_.push_front(key); + cache_[key] = ll_.begin(); + if (cacheMetrics_ != nullptr) { + cacheMetrics_->UpdateAddToCacheCount(); + cacheMetrics_->UpdateAddToCacheBytes(KeyTraits::CountBytes(key)); + } + if (maxCount_ != 0 && ll_.size() > maxCount_) { + return RemoveOldest(); + } + return false; +} + +template +void SglLRUCache::RemoveLocked(const K &key) { + auto iter = cache_.find(key); + if (iter != cache_.end()) { + RemoveElement(iter->second); + } +} + +template +void SglLRUCache::MoveToFront( + const typename std::list::iterator &elem) { + ll_.erase(elem); + ll_.push_front(*elem); + cache_[*elem] = ll_.begin(); +} + +template +bool SglLRUCache::RemoveOldest() { + if (ll_.begin() != ll_.end()) { + RemoveElement(--ll_.end()); + } + return false; +} + +template +void SglLRUCache::RemoveElement( + const typename std::list::iterator &elem) { + if (cacheMetrics_ != nullptr) { + cacheMetrics_->UpdateRemoveFromCacheCount(); + cacheMetrics_->UpdateRemoveFromCacheBytes( + KeyTraits::CountBytes(*elem)); + } + + auto iter = cache_.find(*elem); + cache_.erase(iter); + ll_.erase(elem); +} + } // namespace common } // namespace curve diff --git a/test/common/lru_cache_test.cpp b/test/common/lru_cache_test.cpp index e29236a05f..e700510957 100644 --- a/test/common/lru_cache_test.cpp +++ b/test/common/lru_cache_test.cpp @@ -157,6 +157,82 @@ TEST(CaCheTest, TestCacheHitAndMissMetric) { ASSERT_EQ(10, cache->GetCacheMetrics()->cacheMiss.get_value()); } +TEST(SglCaCheTest, test_cache_with_capacity_limit) { + int maxCount = 5; + auto cache = std::make_shared>(maxCount, + std::make_shared("LruCache")); + + // 1. 测试 put/IsCached + uint64_t cacheSize = 0; + for (int i = 1; i <= maxCount; i++) { + cache->Put(std::to_string(i)); + cacheSize++; + ASSERT_EQ(i, cache->GetCacheMetrics()->cacheCount.get_value()); + ASSERT_TRUE(cache->IsCached(std::to_string(i))); + } + + // 2. 第一个元素被剔出 + cache->Put(std::to_string(11)); + ASSERT_FALSE(cache->IsCached(std::to_string(1))); + + // 3. 测试删除元素 + // 删除不存在的元素 + cache->Remove("1"); + // 删除list中存在的元素 + cache->Remove("2"); + ASSERT_FALSE(cache->IsCached("2")); + ASSERT_EQ(maxCount - 1, cache->GetCacheMetrics()->cacheCount.get_value()); + + // 4. 重复put + cache->Put("4"); + ASSERT_TRUE(cache->IsCached("4")); + ASSERT_EQ(maxCount - 1, cache->GetCacheMetrics()->cacheCount.get_value()); +} + +TEST(SglCaCheTest, test_cache_with_capacity_no_limit) { + auto cache = std::make_shared>( + std::make_shared("LruCache")); + + // 1. 测试 put/IsCached + std::string res; + for (int i = 1; i <= 10; i++) { + std::string eliminated; + cache->Put(std::to_string(i)); + ASSERT_TRUE(cache->IsCached(std::to_string(i))); + ASSERT_FALSE(cache->IsCached(std::to_string(100))); + } + + // 2. 测试元素删除 + cache->Remove("1"); + ASSERT_FALSE(cache->IsCached("1")); +} + +TEST(SglCaCheTest, TestCacheHitAndMissMetric) { + auto cache = std::make_shared>( + std::make_shared("LruCache")); + ASSERT_EQ(0, cache->GetCacheMetrics()->cacheHit.get_value()); + ASSERT_EQ(0, cache->GetCacheMetrics()->cacheMiss.get_value()); + + std::string existKey = "hello"; + std::string notExistKey = "world"; + cache->Put(existKey); + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(cache->IsCached(existKey)); + ASSERT_FALSE(cache->IsCached(notExistKey)); + } + + ASSERT_EQ(10, cache->GetCacheMetrics()->cacheHit.get_value()); + ASSERT_EQ(10, cache->GetCacheMetrics()->cacheMiss.get_value()); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(cache->IsCached(existKey)); + } + + ASSERT_EQ(15, cache->GetCacheMetrics()->cacheHit.get_value()); + ASSERT_EQ(10, cache->GetCacheMetrics()->cacheMiss.get_value()); +} + } // namespace common } // namespace curve