Skip to content

Commit

Permalink
curvefs/client: add feature of warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Aug 10, 2022
1 parent 207f26b commit 11ae6bc
Show file tree
Hide file tree
Showing 7 changed files with 1,211 additions and 17 deletions.
237 changes: 231 additions & 6 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <set>
#include <unordered_map>
#include <utility>
#include <boost/algorithm/string.hpp>

#include "curvefs/proto/mds.pb.h"
#include "curvefs/src/client/fuse_common.h"
Expand Down Expand Up @@ -75,6 +76,15 @@ using rpcclient::Cli2ClientImpl;
using rpcclient::MetaCache;
using common::FLAGS_enableCto;

static bool checkWarmupListPath(const char*, const std::string& target) {
// do something to check the path
LOG(INFO) << "warmupListPath: " << target;
return true;
}
DEFINE_string(warmupListPath, "",
"the path to the list of files (dirs) that need to warmup.");
DEFINE_validator(warmupListPath, checkWarmupListPath);

CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
option_ = option;

Expand Down Expand Up @@ -122,17 +132,235 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
if (ret3 != CURVEFS_ERROR::OK) {
return ret3;
}

ret3 =
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
if (ret3 != CURVEFS_ERROR::OK) {
return ret3;
}

warmUpFile_.exist = false;
bgCmdTaskStop_.store(false, std::memory_order_release);
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
bgCmdStop_.store(false, std::memory_order_release);
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
FLAGS_warmupListPath = "";
taskFetchMetaPool_.Start(WARMUP_THREADS);
return ret3;
}

void FuseClient::BackGroundCmd() {
while (!mounted_.load(std::memory_order_acquire)) {
usleep(WARMUP_CHECKINTERVAL_US);
VLOG(6) << "wait mount success.";
continue;
}
std::string preWarmUpPath = FLAGS_warmupListPath;
std::string warmUpPath;
while (!bgCmdStop_.load(std::memory_order_acquire)) {
warmUpPath = FLAGS_warmupListPath;
if (warmUpPath == preWarmUpPath) {
usleep(WARMUP_CHECKINTERVAL_US); // check interval
continue;
}
VLOG(6) << "has new warmUp task: " << warmUpPath;
preWarmUpPath = warmUpPath;
PutWarmTask(warmUpPath);
WarmUpRun();
}
return;
}

void FuseClient::BackGroundCmdTask() {
while (!mounted_.load(std::memory_order_acquire)) {
usleep(WARMUP_CHECKINTERVAL_US);
VLOG(6) << "wait mount success.";
continue;
}
while (!bgCmdStop_.load(std::memory_order_acquire)) {
std::list<std::string> readAheadPaths;
WaitWarmUp();
while (hasWarmTask()) {
std::string warmUpTask;
GetarmTask(&warmUpTask);
if (warmUpTask.empty()) {
continue;
}
VLOG(6) << "warmUp task is: " << warmUpTask;
std::vector<std::string> splitPath;
boost::split(splitPath, warmUpTask, boost::is_any_of("/"),
boost::token_compress_on);
Dentry dentry;
CURVEFS_ERROR ret = dentryManager_->GetDentry(
fsInfo_->rootinodeid(), splitPath[1], &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING) << "dentryManager_ get dentry fail: "
<< ret << ", name: " << warmUpTask;
}
VLOG(1) << "FetchDentry error: " << ret;
return;
}
if (FsFileType::TYPE_S3 != dentry.type()) {
VLOG(3) << "not a file: " << warmUpTask;
return;
}

fuse_ino_t ino = dentry.inodeid();
std::shared_ptr<InodeWrapper> inodeWrapper;
ret = inodeManager_->GetInode(ino, inodeWrapper);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = "
<< ret << ", inodeid = " << ino;
return;
}
uint64_t len = inodeWrapper->GetLength();
VLOG(9) << "ino is: " << ino << ", len is: " << len;
WarmUpFileContext_t warmUpFile{ino, len, true};
SetWarmUpFile(warmUpFile);
}
}
}

void FuseClient::FetchDentryEnqueue(std::string file) {
VLOG(6) << "FetchDentryEnqueue start: " << file;
auto task = [this, file]() {
LookPath(file);
};
taskFetchMetaPool_.Enqueue(task);
}

void FuseClient::LookPath(std::string file) {
VLOG(6) << "LookPath start: " << file;
// remove the blank
boost::trim(file);
std::vector<std::string> splitPath;
boost::split(splitPath, file, boost::is_any_of("/"),
boost::token_compress_on);
if (splitPath.size() == 2
&& splitPath.back().empty()) {
VLOG(6) << "i am root";
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
return;
} else if (splitPath.size() == 2) {
VLOG(6) << "parent is root: " << fsInfo_->rootinodeid()
<< ", path is: " << splitPath[1];
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[1]);
return;
} else if (splitPath.size() > 2) { // travel path
VLOG(6) << "traverse path size: " << splitPath.size();
std::string lastName = splitPath.back();
splitPath.pop_back();
fuse_ino_t ino = fsInfo_->rootinodeid();
auto iter = splitPath.begin();
// the first member is always empty, so skip
iter++;
for (; iter != splitPath.end(); iter++) {
VLOG(9) << "traverse path: " << *iter
<< "ino is: " << ino;
Dentry dentry;
std::string pathName = *iter;
CURVEFS_ERROR ret = dentryManager_->GetDentry(
ino, pathName, &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING) << "dentryManager_ get dentry fail, ret: "
<< ret << ", parent inodeid: " << ino
<< ", name: " << file;
}
VLOG(1) << "FetchDentry error: " << ret;
return;
}
ino = dentry.inodeid();
}
this->FetchDentry(ino, lastName);
VLOG(9) << "ino is: " << ino
<< "lastname is: " << lastName;
return;
} else {
VLOG(0) << "unknown path";
}
return;
}

void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
auto task = [this, ino]() {
// resolve层层递进,获得inode
this->FetchChildDentry(ino);
};
taskFetchMetaPool_.Enqueue(task);
}

void FuseClient::FetchChildDentry(fuse_ino_t ino) {
VLOG(9) << "FetchChildDentry start: " << ino;
std::list<Dentry> dentryList;
auto limit = option_.listDentryLimit;
CURVEFS_ERROR ret = dentryManager_->ListDentry(
ino, &dentryList, limit);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
<< ", parent = " << ino;
return;
}
for (auto iter : dentryList) {
VLOG(9) << "FetchChildDentry: " << iter.name();
if (FsFileType::TYPE_S3 == iter.type()) {
std::unique_lock<std::mutex> lck(fetchMtx_);
readAheadFiles_.push_front(iter.inodeid());
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
FetchChildDentryEnqueue(iter.inodeid());
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
} else {
VLOG(0) << "unknown type";
}
}
return;
}

void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
VLOG(9) << "FetchDentry start: " << file
<< ", ino: " << ino;
Dentry dentry;
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
<< ", parent inodeid = " << ino
<< ", name = " << file;
}
VLOG(1) << "FetchDentry error: " << ret;
return;
}
if (FsFileType::TYPE_S3 == dentry.type()) {
std::unique_lock<std::mutex> lck(fetchMtx_);
readAheadFiles_.push_front(dentry.inodeid());
return;
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
FetchChildDentryEnqueue(dentry.inodeid());
VLOG(9) << "FetchDentry: " << dentry.inodeid();
return;

} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
} else {
VLOG(0) << "unkown, file: " << file
<< ", ino: " << ino;
}
VLOG(9) << "FetchDentry end: " << file
<< ", ino: " << ino;
return;
}

void FuseClient::UnInit() {
bgCmdTaskStop_.store(true, std::memory_order_release);
bgCmdStop_.store(true, std::memory_order_release);
WarmUpRun();
if (bgCmdTaskThread_.joinable()) {
bgCmdTaskThread_.join();
}
if (bgCmdThread_.joinable()) {
bgCmdThread_.join();
}
taskFetchMetaPool_.Stop();
delete mdsBase_;
mdsBase_ = nullptr;
}
Expand Down Expand Up @@ -178,7 +406,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
<< ", mountPoint = " << mountpoint_.ShortDebugString();
return CURVEFS_ERROR::MOUNT_FAILED;
}

inodeManager_->SetFsId(fsInfo_->fsid());
dentryManager_->SetFsId(fsInfo_->fsid());
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
Expand All @@ -196,7 +423,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
}

init_ = true;

mounted_.store(true, std::memory_order_release);
return CURVEFS_ERROR::OK;
}

Expand Down Expand Up @@ -318,7 +545,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
<< ", inodeid = " << ino;
return ret;
}

::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
if (fi->flags & O_TRUNC) {
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
Expand Down Expand Up @@ -796,7 +1022,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
<< ", inodeid = " << ino;
return ret;
}

::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();

Expand Down
Loading

0 comments on commit 11ae6bc

Please sign in to comment.