Skip to content

Commit

Permalink
[feat]curvefs/client:support warmup to disk/mem
Browse files Browse the repository at this point in the history
Signed-off-by: Cyber-SiKu <[email protected]>
  • Loading branch information
Cyber-SiKu committed Apr 17, 2023
1 parent 8c42aea commit 6f63fdf
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 91 deletions.
13 changes: 13 additions & 0 deletions curvefs/src/client/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ WarmupType GetWarmupType(const std::string& type) {
return ret;
}

const char kCurveFsWarmupStorageDisk[] = "disk";
const char kCurveFsWarmupStorageKvclient[] = "kvclient";

WarmupStorageType GetWarmupStorageType(const std::string &type) {
auto ret = WarmupStorageType::kWarmupStorageTypeUnknown;
if (type == kCurveFsWarmupStorageDisk) {
ret = WarmupStorageType::kWarmupStorageTypeDisk;
} else if (type == kCurveFsWarmupStorageKvclient) {
ret = WarmupStorageType::kWarmupStorageTypeKvClient;
}
return ret;
}

using ::curve::common::StringToUll;

// if direction is true means '+', false means '-'
Expand Down
13 changes: 12 additions & 1 deletion curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ const uint32_t MAX_XATTR_VALUE_LENGTH = 64 * 1024;

const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";


constexpr int kWarmupOpNum = 4;

enum class WarmupOpType {
kWarmupOpUnknown = 0,
kWarmupOpAdd = 1,
Expand All @@ -83,7 +86,15 @@ enum class WarmupType {
kWarmupTypeSingle = 2,
};

WarmupType GetWarmupType(const std::string& type);
WarmupType GetWarmupType(const std::string &type);

enum class WarmupStorageType {
kWarmupStorageTypeUnknown = 0,
kWarmupStorageTypeDisk = 1,
kWarmupStorageTypeKvClient = 2,
};

WarmupStorageType GetWarmupStorageType(const std::string &type);

enum class FileHandle : uint64_t {
kDefaultValue = 0,
Expand Down
18 changes: 13 additions & 5 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,16 @@ void FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) {
}

int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
const std::string &path) {
const std::string &path,
curvefs::client::common::WarmupStorageType storageType) {
int ret = 0;
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key);
result = g_ClientInstance->PutWarmFilelistTask(key, storageType);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
result = g_ClientInstance->PutWarmFileTask(key, path);
result = g_ClientInstance->PutWarmFileTask(key, path, storageType);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
Expand Down Expand Up @@ -328,16 +329,23 @@ int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {

std::vector<std::string> opTypePath;
curve::common::SplitString(value, "\n", &opTypePath);
if (opTypePath.size() != 3) {
if (opTypePath.size() != curvefs::client::common::kWarmupOpNum) {
LOG(ERROR) << name << " has invalid xattr value " << value;
return ERANGE;
}
auto storageType =
curvefs::client::common::GetWarmupStorageType(opTypePath[3]);
if (storageType ==
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeUnknown) {
LOG(ERROR) << name << " not support storage type: " << value;
return ERANGE;
}
int ret = 0;
switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd:
ret =
AddWarmupTask(curvefs::client::common::GetWarmupType(opTypePath[1]),
key, opTypePath[2]);
key, opTypePath[2], storageType);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
}
Expand Down
9 changes: 5 additions & 4 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,17 @@ class FuseClient {
enableSumInDir_ = enable;
}

bool PutWarmFilelistTask(fuse_ino_t key) {
bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key);
return warmupManager_->AddWarmupFilelist(key, type);
} // only support s3
return true;
}

bool PutWarmFileTask(fuse_ino_t key, const std::string& path) {
bool PutWarmFileTask(fuse_ino_t key, const std::string &path,
common::WarmupStorageType type) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFile(key, path);
return warmupManager_->AddWarmupFile(key, path, type);
} // only support s3
return true;
}
Expand Down
94 changes: 56 additions & 38 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <memory>
#include <utility>

#include "curvefs/src/client/common/common.h"
#include "curvefs/src/client/inode_wrapper.h"
#include "curvefs/src/client/kvclient/kvclient_manager.h"
#include "curvefs/src/client/s3/client_s3_cache_manager.h"
Expand All @@ -48,16 +49,17 @@ using curve::common::WriteLockGuard;

#define WARMUP_CHECKINTERVAL_US (1000 * 1000)

bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key) {
bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
WarmupStorageType type) {
if (!mounted_.load(std::memory_order_acquire)) {
LOG(ERROR) << "not mounted";
return false;
}
// add warmup Progress
if (AddWarmupProcess(key)) {
if (AddWarmupProcess(key, type)) {
VLOG(9) << "add warmup list task:" << key;
WriteLockGuard lock(warmupFilelistDequeMutex_);
auto iter = FindKeyWarmupFilelistLocked(key);
auto iter = FindWarmupFilelistByKeyLocked(key);
if (iter == warmupFilelistDeque_.end()) {
std::shared_ptr<InodeWrapper> inodeWrapper;
CURVEFS_ERROR ret = inodeManager_->GetInode(key, inodeWrapper);
Expand All @@ -73,14 +75,14 @@ bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key) {
return true;
}

bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key,
const std::string &path) {
bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string &path,
WarmupStorageType type) {
if (!mounted_.load(std::memory_order_acquire)) {
LOG(ERROR) << "not mounted";
return false;
}
// add warmup Progress
if (AddWarmupProcess(key)) {
if (AddWarmupProcess(key, type)) {
VLOG(9) << "add warmup single task:" << key;
FetchDentryEnqueue(key, path);
}
Expand Down Expand Up @@ -246,7 +248,7 @@ void WarmupManagerS3Impl::FetchDentry(fuse_ino_t key, fuse_ino_t ino,
}
if (FsFileType::TYPE_S3 == dentry.type()) {
WriteLockGuard lock(warmupInodesDequeMutex_);
auto iterDeque = FindKeyWarmupInodesLocked(key);
auto iterDeque = FindWarmupInodesByKeyLocked(key);
if (iterDeque == warmupInodesDeque_.end()) {
warmupInodesDeque_.emplace_back(
key, std::set<fuse_ino_t>{dentry.inodeid()});
Expand Down Expand Up @@ -286,7 +288,7 @@ void WarmupManagerS3Impl::FetchChildDentry(fuse_ino_t key, fuse_ino_t ino) {
<< " dentry: " << dentry.name();
if (FsFileType::TYPE_S3 == dentry.type()) {
WriteLockGuard lock(warmupInodesDequeMutex_);
auto iterDeque = FindKeyWarmupInodesLocked(key);
auto iterDeque = FindWarmupInodesByKeyLocked(key);
if (iterDeque == warmupInodesDeque_.end()) {
warmupInodesDeque_.emplace_back(
key, std::set<fuse_ino_t>{dentry.inodeid()});
Expand Down Expand Up @@ -342,7 +344,7 @@ void WarmupManagerS3Impl::TravelChunks(
TravelChunk(ino, infoIter.second, &prefetchObjs);
{
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindKeyWarmupProgressLocked(key);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter != inode2Progress_.end()) {
iter->second.AddTotal(prefetchObjs.size());
} else {
Expand Down Expand Up @@ -470,17 +472,7 @@ void WarmupManagerS3Impl::WarmUpAllObjs(
}
if (context->retCode == 0) {
VLOG(9) << "Get Object success: " << context->key;
{
// update progress
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindKeyWarmupProgressLocked(key);
if (iter != inode2Progress_.end()) {
iter->second.FinishedPlusOne();
} else {
VLOG(9) << "no such warmup progress: " << key;
}
}
PutObjectToCache(context->key, context->buf, context->len);
PutObjectToCache(key, context->key, context->buf, context->len);
CollectMetrics(&warmupS3Metric_.warmupS3Cached, context->len,
start);
warmupS3Metric_.warmupS3CacheSize << context->len;
Expand Down Expand Up @@ -515,9 +507,17 @@ void WarmupManagerS3Impl::WarmUpAllObjs(
VLOG(9) << "download start: " << iter.first;
std::string name = iter.first;
uint64_t readLen = iter.second;
if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) {
pendingReq.fetch_sub(1);
continue;
{
ReadLockGuard lock(inode2ProgressMutex_);
auto iterProgress = FindWarmupProgressByKeyLocked(key);
if (iterProgress->second.GetStorageType() ==
curvefs::client::common::WarmupStorageType::
kWarmupStorageTypeDisk &&
s3Adaptor_->GetDiskCacheManager()->IsCached(name)) {
// storage in disk and has cached
pendingReq.fetch_sub(1);
continue;
}
}
char *cacheS3 = new char[readLen];
memset(cacheS3, 0, readLen);
Expand All @@ -539,25 +539,26 @@ bool WarmupManagerS3Impl::ProgressDone(fuse_ino_t key) {
bool ret;
{
ReadLockGuard lockList(warmupFilelistDequeMutex_);
ret = FindKeyWarmupFilelistLocked(key) == warmupFilelistDeque_.end();
ret =
FindWarmupFilelistByKeyLocked(key) == warmupFilelistDeque_.end();
}

{
ReadLockGuard lockDentry(inode2FetchDentryPoolMutex_);
ret = ret && (FindKeyFetchDentryPoolLocked(key) ==
ret = ret && (FindFetchDentryPoolByKeyLocked(key) ==
inode2FetchDentryPool_.end());
}

{
ReadLockGuard lockInodes(warmupInodesDequeMutex_);
ret =
ret && (FindKeyWarmupInodesLocked(key) == warmupInodesDeque_.end());
ret = ret &&
(FindWarmupInodesByKeyLocked(key) == warmupInodesDeque_.end());
}


{
ReadLockGuard lockS3Objects(inode2FetchS3ObjectsPoolMutex_);
ret = ret && (FindKeyFetchS3ObjectsPoolLocked(key) ==
ret = ret && (FindFetchS3ObjectsPoolByKeyLocked(key) ==
inode2FetchS3ObjectsPool_.end());
}
return ret;
Expand Down Expand Up @@ -676,18 +677,35 @@ void WarmupManagerS3Impl::AddFetchS3objectsTask(fuse_ino_t key,
}
}

void WarmupManagerS3Impl::PutObjectToCache(const std::string &filename,
void WarmupManagerS3Impl::PutObjectToCache(fuse_ino_t key,
const std::string &filename,
const char *data, uint64_t len) {
int ret =
s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(filename, data, len);
if (ret < 0) {
LOG_EVERY_SECOND(INFO)
<< "write read directly failed, key: " << filename;
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter == inode2Progress_.end()) {
VLOG(9) << "no this warmup task progress: " << key;
return;
}

if (kvClientManager_ != nullptr) {
kvClientManager_->Set(
std::make_shared<SetKVCacheTask>(filename, data, len));
int ret;
// update progress
iter->second.FinishedPlusOne();
switch (iter->second.GetStorageType()) {
case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk:
ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(filename, data,
len);
if (ret < 0) {
LOG_EVERY_SECOND(INFO)
<< "write read directly failed, key: " << filename;
}
break;
case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeKvClient:
if (kvClientManager_ != nullptr) {
kvClientManager_->Set(
std::make_shared<SetKVCacheTask>(filename, data, len));
}
break;
default:
LOG_EVERY_N(ERROR, 1000) << "unsupported warmup storage type";
}
}

Expand Down
Loading

0 comments on commit 6f63fdf

Please sign in to comment.