Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/client: adapter lru list for disk cache #1088

Merged
merged 1 commit into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 30 additions & 41 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ DiskCacheManager::DiskCacheManager(std::shared_ptr<PosixWrapper> posixWrapper,
fullRatio_ = 0;
safeRatio_ = 0;
maxUsableSpaceBytes_ = 0;
// cannot limit the size,
// because cache is been delete must after upload to s3
cachedObjName_ = std::make_shared<
SglLRUCache<std::string>>(0,
std::make_shared<CacheMetrics>("diskcache"));
}

int DiskCacheManager::Init(S3Client *client,
Expand Down Expand Up @@ -93,12 +98,14 @@ int DiskCacheManager::Init(S3Client *client,
std::thread uploadThread =
std::thread(&DiskCacheManager::UploadAllCacheWriteFile, this);
uploadThread.detach();

// load all cache read file
ret = cacheRead_->LoadAllCacheReadFile(&cachedObjName_);
ret = cacheRead_->LoadAllCacheReadFile(cachedObjName_);
if (ret < 0) {
LOG(ERROR) << "load all cache read file error. ret = " << ret;
return ret;
}

// start trim thread
TrimRun();

Expand Down Expand Up @@ -145,15 +152,12 @@ int DiskCacheManager::UploadAllCacheWriteFile() {
}

void DiskCacheManager::AddCache(const std::string name) {
std::lock_guard<bthread::Mutex> lk(mtx_);
cachedObjName_.emplace(name);
cachedObjName_->Put(name);
VLOG(9) << "cache size is: " << cachedObjName_->Size();
}

bool DiskCacheManager::IsCached(const std::string name) {
std::lock_guard<bthread::Mutex> lk(mtx_);
std::set<std::string>::iterator cacheKeyIter;
cacheKeyIter = cachedObjName_.find(name);
if (cacheKeyIter == cachedObjName_.end()) {
if (!cachedObjName_->IsCached(name)) {
VLOG(9) << "not cached, name = " << name;
return false;
}
Expand Down Expand Up @@ -331,23 +335,23 @@ void DiskCacheManager::TrimCache() {
std::string cacheWriteFullDir;
cacheReadFullDir = GetCacheReadFullDir();
cacheWriteFullDir = GetCacheWriteFullDir();
std::set<std::string> cachedObjNameTmp;
{
std::lock_guard<bthread::Mutex> lk(mtx_);
cachedObjNameTmp = cachedObjName_;
}

while (!IsDiskCacheSafe()) {
std::string cacheReadFile, cacheWriteFile;
std::set<std::string>::iterator cacheKeyIter;
cacheKeyIter = cachedObjNameTmp.begin();
if (cacheKeyIter == cachedObjNameTmp.end()) {
if (cachedObjName_->Size() == 0) {
VLOG(3) << "remove disk file error"
<< ", cachedObjName is empty.";
<< ", cachedObjName is empty.";
break;
}

cacheReadFile = cacheReadFullDir + "/" + *cacheKeyIter;
cacheWriteFile = cacheWriteFullDir + "/" + *cacheKeyIter;
std::string cacheKey;
cachedObjName_->GetBack(&cacheKey);
if (cacheKey.empty()) {
VLOG(3) << "cachekey is empty";
break;
}
VLOG(9) << "obj will be removed: " << cacheKey;
cacheReadFile = cacheReadFullDir + "/" + cacheKey;
cacheWriteFile = cacheWriteFullDir + "/" + cacheKey;
wuhongsong marked this conversation as resolved.
Show resolved Hide resolved
struct stat statFile;
int ret;
ret = posixWrapper_->stat(cacheWriteFile.c_str(), &statFile);
Expand All @@ -359,28 +363,15 @@ void DiskCacheManager::TrimCache() {
if (ret == 0) {
VLOG(3) << "do not remove this disk file"
<< ", file has not been uploaded to S3."
<< ", file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
continue;
}
{
std::lock_guard<bthread::Mutex> lk(mtx_);
std::set<std::string>::iterator iter;
iter = cachedObjName_.find(*cacheKeyIter);
if (iter == cachedObjName_.end()) {
VLOG(6) << "remove disk file error"
<< ", file is not exist in cachedObjName"
<< ", file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
continue;
}
cachedObjName_.erase(iter);
<< ", file is: " << cacheKey;
break;
}
cachedObjName_->Remove(cacheKey);
wuhongsong marked this conversation as resolved.
Show resolved Hide resolved
struct stat statReadFile;
ret = posixWrapper_->stat(cacheReadFile.c_str(), &statReadFile);
if (ret != 0) {
VLOG(3) << "remove disk file error"
<< ", file is: " << *cacheKeyIter;
VLOG(3) << "stat disk file error"
<< ", file is: " << cacheKey;
continue;
}
// if remove disk file before delete cache,
Expand All @@ -390,14 +381,12 @@ void DiskCacheManager::TrimCache() {
ret = posixWrapper_->remove(toDelFile);
if (ret < 0) {
LOG(ERROR)
<< "remove disk file error, file is: " << *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
<< "remove disk file error, file is: " << cacheKey;
continue;
}
DecDiskUsedBytes(statReadFile.st_size);
VLOG(3) << "remove disk file success, file is: "
<< *cacheKeyIter;
cachedObjNameTmp.erase(cacheKeyIter);
<< cacheKey;
}
VLOG(3) << "trim over.";
}
Expand Down
8 changes: 5 additions & 3 deletions curvefs/src/client/s3/disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

#include <bthread/mutex.h>

#include <list>
#include <string>
#include <vector>
#include <set>

#include "src/common/concurrent/concurrent.h"
#include "src/common/interruptible_sleeper.h"
#include "src/common/lru_cache.h"
#include "src/common/throttle.h"
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/common/utils.h"
Expand All @@ -40,8 +42,10 @@
namespace curvefs {
namespace client {

using ::curve::common::LRUCache;
using curvefs::common::PosixWrapper;
using curve::common::ReadWriteThrottleParams;
using ::curve::common::SglLRUCache;
using curvefs::common::SysUtils;
using curve::common::ThrottleParams;
using curve::common::Throttle;
Expand Down Expand Up @@ -148,9 +152,7 @@ class DiskCacheManager {
std::string cacheDir_;
std::shared_ptr<DiskCacheWrite> cacheWrite_;
std::shared_ptr<DiskCacheRead> cacheRead_;
std::set<std::string> cachedObjName_;

bthread::Mutex mtx_;
std::shared_ptr<SglLRUCache<std::string>> cachedObjName_;

S3Client *client_;
std::shared_ptr<PosixWrapper> posixWrapper_;
Expand Down
6 changes: 4 additions & 2 deletions curvefs/src/client/s3/disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ int DiskCacheRead::LinkWriteToRead(const std::string fileName,
return 0;
}

int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
int DiskCacheRead::LoadAllCacheReadFile(
wuhongsong marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<SglLRUCache<std::string>> cachedObj) {
LOG(INFO) << "LoadAllCacheReadFile start. ";
std::string cacheReadPath;
bool ret;
Expand All @@ -124,7 +125,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
(!strncmp(cacheReadDirent->d_name, "..", 2)))
continue;
std::string fileName = cacheReadDirent->d_name;
cachedObj->emplace(fileName);
cachedObj->Put(fileName);
VLOG(3) << "LoadAllCacheReadFile obj, name = " << fileName;
}
VLOG(6) << "close start.";
Expand All @@ -133,6 +134,7 @@ int DiskCacheRead::LoadAllCacheReadFile(std::set<std::string> *cachedObj) {
LOG(ERROR) << "opendir error, errno = " << errno;
return rc;
}

LOG(INFO) << "LoadAllCacheReadFile success.";
return 0;
}
Expand Down
8 changes: 6 additions & 2 deletions curvefs/src/client/s3/disk_cache_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@
#ifndef CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_
#define CURVEFS_SRC_CLIENT_S3_DISK_CACHE_READ_H_

#include <list>
#include <string>
#include <vector>
#include <set>
#include <vector>

#include "src/common/concurrent/concurrent.h"
#include "src/common/interruptible_sleeper.h"
#include "src/common/lru_cache.h"
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/client/s3/disk_cache_base.h"

namespace curvefs {
namespace client {

using ::curve::common::SglLRUCache;
using curvefs::common::PosixWrapper;

class DiskCacheRead : public DiskCacheBase {
Expand All @@ -53,7 +56,8 @@ class DiskCacheRead : public DiskCacheBase {
/**
* @brief after reboot,load all files that store in read cache.
*/
virtual int LoadAllCacheReadFile(std::set<std::string>* cachedObj);
virtual int LoadAllCacheReadFile(std::shared_ptr<
SglLRUCache<std::string>> cachedObj);
virtual void InitMetrics(std::shared_ptr<DiskCacheMetric> metric) {
metric_ = metric;
}
Expand Down
17 changes: 17 additions & 0 deletions curvefs/test/client/test_disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,23 @@ TEST_F(TestDiskCacheManager, WriteDiskFile) {
ASSERT_EQ(0, ret);
}

TEST_F(TestDiskCacheManager, IsCached) {
std::string fileName = "test";
std::string fileName2 = "test2";
bool ret = diskCacheManager_->IsCached(fileName);
ASSERT_EQ(false, ret);

diskCacheManager_->AddCache(fileName);
diskCacheManager_->AddCache(fileName2);
ret = diskCacheManager_->IsCached(fileName2);
ASSERT_EQ(true, ret);

diskCacheManager_->AddCache(fileName);
diskCacheManager_->AddCache(fileName2);
ret = diskCacheManager_->IsCached(fileName);
ASSERT_EQ(true, ret);
}

TEST_F(TestDiskCacheManager, SetDiskFsUsedRatio) {
EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull()))
.WillOnce(Return(-1));
Expand Down
18 changes: 12 additions & 6 deletions curvefs/test/client/test_disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include "src/common/lru_cache.h"
#include "curvefs/test/client/mock_test_posix_wapper.h"
#include "curvefs/test/client/mock_disk_cache_base.h"
#include "curvefs/src/client/s3/disk_cache_read.h"
Expand All @@ -46,6 +47,9 @@ using ::testing::ElementsAre;
using ::testing::SetArgPointee;
using ::testing::ReturnArg;

using ::curve::common::CacheMetrics;
using ::curve::common::SglLRUCache;

class TestDiskCacheRead : public ::testing::Test {
protected:
TestDiskCacheRead() {}
Expand All @@ -67,7 +71,6 @@ class TestDiskCacheRead : public ::testing::Test {
std::shared_ptr<MockPosixWrapper> wrapper_;
};


TEST_F(TestDiskCacheRead, ReadDiskFile) {
EXPECT_CALL(*wrapper_, open(_, _, _))
.WillOnce(Return(-1));
Expand Down Expand Up @@ -154,15 +157,18 @@ TEST_F(TestDiskCacheRead, LinkWriteToRead) {
TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(-1));
std::set<std::string> cachedObj;
int ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
std::shared_ptr<SglLRUCache<std::string>> cachedObj;
cachedObj = std::make_shared<
SglLRUCache<std::string>>(0,
std::make_shared<CacheMetrics>("diskcache"));
int ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(-1, ret);

EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(0));
EXPECT_CALL(*wrapper_, opendir(NotNull()))
.WillOnce(ReturnNull());
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(-1, ret);

DIR* dir = opendir(".");
Expand All @@ -174,7 +180,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
.WillOnce(Return(0));
EXPECT_CALL(*wrapper_, readdir(NotNull()))
.WillOnce(ReturnNull());
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(0, ret);

struct dirent* dirent;
Expand All @@ -190,7 +196,7 @@ TEST_F(TestDiskCacheRead, LoadAllCacheReadFile) {
.WillOnce(ReturnNull());
EXPECT_CALL(*wrapper_, closedir(NotNull()))
.WillOnce(Return(0));
ret = diskCacheRead_->LoadAllCacheReadFile(&cachedObj);
ret = diskCacheRead_->LoadAllCacheReadFile(cachedObj);
ASSERT_EQ(0, ret);
}

Expand Down
2 changes: 1 addition & 1 deletion curvefs/test/client/test_disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TestDiskCacheWrite : public ::testing::Test {
};

TEST_F(TestDiskCacheWrite, ReadFile) {
uint64_t length;
uint64_t length = 0;
char* buf;
EXPECT_CALL(*wrapper_, stat(NotNull(), NotNull()))
.WillOnce(Return(-1));
Expand Down
Loading