Skip to content

Commit

Permalink
curvefs/client: adapter lru list for disk cache
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Mar 2, 2022
1 parent 6abf07b commit a2b0293
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 55 deletions.
71 changes: 30 additions & 41 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ DiskCacheManager::DiskCacheManager(std::shared_ptr<PosixWrapper> posixWrapper,
fullRatio_ = 0;
safeRatio_ = 0;
maxUsableSpaceBytes_ = 0;
// cannot limit the size,
// because cache is been delete must after upload to s3
cachedObjName_ = std::make_shared<
SglLRUCache<std::string>>(0,
std::make_shared<CacheMetrics>("diskcache"));
}

int DiskCacheManager::Init(S3Client *client,
Expand Down Expand Up @@ -93,12 +98,14 @@ int DiskCacheManager::Init(S3Client *client,
std::thread uploadThread =
std::thread(&DiskCacheManager::UploadAllCacheWriteFile, this);
uploadThread.detach();

// load all cache read file
ret = cacheRead_->LoadAllCacheReadFile(&cachedObjName_);
ret = cacheRead_->LoadAllCacheReadFile(cachedObjName_);
if (ret < 0) {
LOG(ERROR) << "load all cache read file error. ret = " << ret;
return ret;
}

// start trim thread
TrimRun();

Expand Down Expand Up @@ -145,15 +152,12 @@ int DiskCacheManager::UploadAllCacheWriteFile() {
}

void DiskCacheManager::AddCache(const std::string name) {
std::lock_guard<bthread::Mutex> lk(mtx_);
cachedObjName_.emplace(name);
cachedObjName_->Put(name);
VLOG(9) << "cache size is: " << cachedObjName_->Size();
}

bool DiskCacheManager::IsCached(const std::string name) {
std::lock_guard<bthread::Mutex> lk(mtx_);
std::set<std::string>::iterator cacheKeyIter;
cacheKeyIter = cachedObjName_.find(name);
if (cacheKeyIter == cachedObjName_.end()) {
if (!cachedObjName_->IsCached(name)) {
VLOG(9) << "not cached, name = " << name;
return false;
}
Expand Down Expand Up @@ -331,23 +335,23 @@ void DiskCacheManager::TrimCache() {
std::string cacheWriteFullDir;
cacheReadFullDir = GetCacheReadFullDir();
cacheWriteFullDir = GetCacheWriteFullDir();
std::set<std::string> cachedObjNameTmp;
{
std::lock_guard<bthread::Mutex> lk(mtx_);
cachedObjNameTmp = cachedObjName_;
}

while (!IsDiskCacheSafe()) {
std::string cacheReadFile, cacheWriteFile;
std::set<std::string>::iterator cacheKeyIter;
cacheKeyIter = cachedObjNameTmp.begin();
if (cacheKeyIter == cachedObjNameTmp.end()) {
if (cachedObjName_->Size() == 0) {
VLOG(3) << "remove disk file error"
<< ", cachedObjName is empty.";
<< ", cachedObjName is empty.";
break;
}

cacheReadFile = cacheReadFullDir + "/" + *cacheKeyIter;
cacheWriteFile = cacheWriteFullDir + "/" + *cacheKeyIter;
std::string cacheKey;
cachedObjName_->GetBack(&cacheKey);
if (cacheKey.empty()) {
VLOG(3) << "cachekey is empty";
break;
}
VLOG(9) << "obj will be removed: " << cacheKey;
cacheReadFile = cacheReadFullDir + "/" + cacheKey;
cacheWriteFile = cacheWriteFullDir + "/" + cacheKey;
struct stat statFile;
int ret;
ret = posixWrapper_->stat(cacheWriteFile.c_str(), &statFile);
Expand All @@ -359,28 +363,15 @@ void DiskCacheManager::TrimCache() {
if (ret == 0) {
VLOG(3) << "do not remove this disk file"
<< ", file has not been uploaded to S3."
<< ", file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
continue;
}
{
std::lock_guard<bthread::Mutex> lk(mtx_);
std::set<std::string>::iterator iter;
iter = cachedObjName_.find(*cacheKeyIter);
if (iter == cachedObjName_.end()) {
VLOG(6) << "remove disk file error"
<< ", file is not exist in cachedObjName"
<< ", file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
continue;
}
cachedObjName_.erase(iter);
<< ", file is: " << cacheKey;
break;
}
cachedObjName_->Remove(cacheKey);
struct stat statReadFile;
ret = posixWrapper_->stat(cacheReadFile.c_str(), &statReadFile);
if (ret != 0) {
VLOG(3) << "remove disk file error"
<< ", file is: " << *cacheKeyIter;
VLOG(3) << "stat disk file error"
<< ", file is: " << cacheKey;
continue;
}
// if remove disk file before delete cache,
Expand All @@ -390,14 +381,12 @@ void DiskCacheManager::TrimCache() {
ret = posixWrapper_->remove(toDelFile);
if (ret < 0) {
LOG(ERROR)
<< "remove disk file error, file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
<< "remove disk file error, file is: " << cacheKey;
continue;
}
DecDiskUsedBytes(statReadFile.st_size);
VLOG(3) << "remove disk file success, file is: "
<< *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
<< cacheKey;
}
VLOG(3) << "trim over.";
}
Expand Down
8 changes: 5 additions & 3 deletions curvefs/src/client/s3/disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

#include <bthread/mutex.h>

#include <list>
#include <string>
#include <vector>
#include <set>

#include "src/common/concurrent/concurrent.h"
#include "src/common/interruptible_sleeper.h"
#include "src/common/lru_cache.h"
#include "src/common/throttle.h"
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/common/utils.h"
Expand All @@ -40,8 +42,10 @@
namespace curvefs {
namespace client {

using ::curve::common::LRUCache;
using curvefs::common::PosixWrapper;
using curve::common::ReadWriteThrottleParams;
using ::curve::common::SglLRUCache;
using curvefs::common::SysUtils;
using curve::common::ThrottleParams;
using curve::common::Throttle;
Expand Down Expand Up @@ -148,9 +152,7 @@ class DiskCacheManager {
std::string cacheDir_;
std::shared_ptr<DiskCacheWrite> cacheWrite_;
std::shared_ptr<DiskCacheRead> cacheRead_;
std::set<std::string> cachedObjName_;

bthread::Mutex mtx_;
std::shared_ptr<SglLRUCache<std::string>> cachedObjName_;

S3Client *client_;
std::shared_ptr<PosixWrapper> posixWrapper_;
Expand Down
6 changes: 4 additions & 2 deletions curvefs/src/client/s3/disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ int DiskCacheRead::LinkWriteToRead(const std::string fileName,
return 0;
}

int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
int DiskCacheRead::LoadAllCacheReadFile(
std::shared_ptr<SglLRUCache<std::string>> cachedObj) {
LOG(INFO) << "LoadAllCacheReadFile start. ";
std::string cacheReadPath;
bool ret;
Expand All @@ -124,7 +125,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
(!strncmp(cacheReadDirent->d_name, "..", 2)))
continue;
std::string fileName = cacheReadDirent->d_name;
cachedObj->emplace(fileName);
cachedObj->Put(fileName);
VLOG(3) << "LoadAllCacheReadFile obj, name = " << fileName;
}
VLOG(6) << "close start.";
Expand All @@ -133,6 +134,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
LOG(ERROR) << "opendir error, errno = " << errno;
return rc;
}

LOG(INFO) << "LoadAllCacheReadFile success.";
return 0;
}
Expand Down
8 changes: 6 additions & 2 deletions curvefs/src/client/s3/disk_cache_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@
#ifndef CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_
#define CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_

#include <list>
#include <string>
#include <vector>
#include <set>
#include <vector>

#include "src/common/concurrent/concurrent.h"
#include "src/common/interruptible_sleeper.h"
#include "src/common/lru_cache.h"
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/client/s3/disk_cache_base.h"

namespace curvefs {
namespace client {

using ::curve::common::SglLRUCache;
using curvefs::common::PosixWrapper;

class DiskCacheRead : public DiskCacheBase {
Expand All @@ -53,7 +56,8 @@ class DiskCacheRead : public DiskCacheBase {
/**
* @brief after reboot,load all files that store in read cache.
*/
virtual int LoadAllCacheReadFile(std::set<std::string>* cachedObj);
virtual int LoadAllCacheReadFile(std::shared_ptr<
SglLRUCache<std::string>> cachedObj);
virtual void InitMetrics(std::shared_ptr<DiskCacheMetric> metric) {
metric_ = metric;
}
Expand Down
17 changes: 17 additions & 0 deletions curvefs/test/client/test_disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,23 @@ TEST_F(TestDiskCacheManager, WriteDiskFile) {
ASSERT_EQ(0, ret);
}

TEST_F(TestDiskCacheManager, IsCached) {
std::string fileName = "test";
std::string fileName2 = "test2";
bool ret = diskCacheManager_->IsCached(fileName);
ASSERT_EQ(false, ret);

diskCacheManager_->AddCache(fileName);
diskCacheManager_->AddCache(fileName2);
ret = diskCacheManager_->IsCached(fileName2);
ASSERT_EQ(true, ret);

diskCacheManager_->AddCache(fileName);
diskCacheManager_->AddCache(fileName2);
ret = diskCacheManager_->IsCached(fileName);
ASSERT_EQ(true, ret);
}

TEST_F(TestDiskCacheManager, SetDiskFsUsedRatio) {
EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull()))
.WillOnce(Return(-1));
Expand Down
18 changes: 12 additions & 6 deletions curvefs/test/client/test_disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include "src/common/lru_cache.h"
#include "curvefs/test/client/mock_test_posix_wapper.h"
#include "curvefs/test/client/mock_disk_cache_base.h"
#include "curvefs/src/client/s3/disk_cache_read.h"
Expand All @@ -46,6 +47,9 @@ using ::testing::ElementsAre;
using ::testing::SetArgPointee;
using ::testing::ReturnArg;

using ::curve::common::CacheMetrics;
using ::curve::common::SglLRUCache;

class TestDiskCacheRead : public ::testing::Test {
protected:
TestDiskCacheRead() {}
Expand All @@ -67,7 +71,6 @@ class TestDiskCacheRead : public ::testing::Test {
std::shared_ptr<MockPosixWrapper> wrapper_;
};


TEST_F(TestDiskCacheRead, ReadDiskFile) {
EXPECT_CALL(*wrapper_, open(_, _, _))
.WillOnce(Return(-1));
Expand Down Expand Up @@ -154,15 +157,18 @@ TEST_F(TestDiskCacheRead, LinkWriteToRead) {
TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(-1));
std::set<std::string> cachedObj;
int ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
std::shared_ptr<SglLRUCache<std::string>> cachedObj;
cachedObj = std::make_shared<
SglLRUCache<std::string>>(0,
std::make_shared<CacheMetrics>("diskcache"));
int ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(-1, ret);

EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(0));
EXPECT_CALL(*wrapper_, opendir(NotNull()))
.WillOnce(ReturnNull());
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(-1, ret);

DIR* dir = opendir(".");
Expand All @@ -174,7 +180,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
.WillOnce(Return(0));
EXPECT_CALL(*wrapper_, readdir(NotNull()))
.WillOnce(ReturnNull());
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(0, ret);

struct dirent* dirent;
Expand All @@ -190,7 +196,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
.WillOnce(ReturnNull());
EXPECT_CALL(*wrapper_, closedir(NotNull()))
.WillOnce(Return(0));
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(0, ret);
}

Expand Down
2 changes: 1 addition & 1 deletion curvefs/test/client/test_disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TestDiskCacheWrite : public ::testing::Test {
};

TEST_F(TestDiskCacheWrite, ReadFile) {
uint64_t length;
uint64_t length = 0;
char* buf;
EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(-1));
Expand Down
Loading

0 comments on commit a2b0293

Please sign in to comment.