Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs: fix refresh inode will overwrite data in cache when enabel cto #1993

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fuseClient.iCacheLruSize=65536
fuseClient.dCacheLruSize=1000000
fuseClient.enableICacheMetrics=true
fuseClient.enableDCacheMetrics=true
fuseClient.cto=false
fuseClient.cto=true
# you shoudle enable it when mount one filesystem to multi mountpoints,
# it gurantee the consistent of file after rename, otherwise you should
# disable it for performance.
Expand Down
2 changes: 0 additions & 2 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,6 @@ void FuseOpOpen(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) {
FuseReplyErrByErrCode(req, ret);
return;
}
// set fh to 1 to indicate needn't refresh inode when file is still opened
fi->fh = static_cast<uint64_t>(FileHandle::kKeepCache);
fuse_reply_open(req, fi);
}

Expand Down
19 changes: 6 additions & 13 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
return CURVEFS_ERROR::NOPERMISSION;
}
}

if (FLAGS_enableCto) {
inodeManager_->AddOpenedInode(ino);
}
return ret;
}

Expand Down Expand Up @@ -959,18 +961,6 @@ CURVEFS_ERROR FuseClient::FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi,
struct stat *attr) {
VLOG(1) << "FuseOpGetAttr ino = " << ino;
if (FLAGS_enableCto) {
if (fi == nullptr ||
fi->fh != static_cast<uint64_t>(FileHandle::kKeepCache)) {
CURVEFS_ERROR ret = inodeManager_->RefreshInode(ino);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = " << ret
<< ", inodeid = " << ino;
return ret;
}
}
}

InodeAttr inodeAttr;
CURVEFS_ERROR ret =
inodeManager_->GetInodeAttr(ino, &inodeAttr);
Expand Down Expand Up @@ -1370,6 +1360,9 @@ CURVEFS_ERROR FuseClient::FuseOpReadLink(fuse_req_t req, fuse_ino_t ino,
CURVEFS_ERROR FuseClient::FuseOpRelease(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
VLOG(1) << "FuseOpRelease, ino: " << ino;
if (FLAGS_enableCto) {
inodeManager_->RemoveOpenedInode(ino);
}
return CURVEFS_ERROR::OK;
}

Expand Down
82 changes: 38 additions & 44 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace curvefs {
namespace client {

using NameLockGuard = ::curve::common::GenericNameLockGuard<Mutex>;
using curvefs::client::common::FLAGS_enableCto;

class TrimICacheAsyncDone : public MetaServerClientDone {
public:
Expand Down Expand Up @@ -107,7 +108,7 @@ InodeCacheManagerImpl::GetInode(uint64_t inodeId,
NameLockGuard lock(nameLock_, std::to_string(inodeId));
// get inode from cache
bool ok = iCache_->Get(inodeId, &out);
if (ok) {
if (ok && NeedUseCahce(inodeId, out->IsDirty())) {
curve::common::UniqueLock lgGuard = out->GetUniqueLock();
if (out->GetType() == FsFileType::TYPE_FILE) {
return CURVEFS_ERROR::OK;
Expand All @@ -134,48 +135,13 @@ InodeCacheManagerImpl::GetInode(uint64_t inodeId,
return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR
InodeCacheManagerImpl::RefreshInode(uint64_t inodeId) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));

// get inode from metaserver
Inode inode;
bool streaming = false;
GET_INODE_REMOTE(fsId_, inodeId, &inode, &streaming);

// get inode from cache
std::shared_ptr<InodeWrapper> out;
bool ok = iCache_->Get(inodeId, &out);
curve::common::UniqueLock lgGuard;
if (!ok) {
out = std::make_shared<InodeWrapper>(
std::move(inode), metaClient_, s3ChunkInfoMetric_,
option_.maxDataSize, option_.refreshDataIntervalSec);
} else {
lgGuard = out->GetUniqueLock();
streaming = true;
}

// refresh data
REFRESH_DATA_REMOTE(out, streaming);

// put to cache or refresh length
if (!ok) {
PUT_INODE_CACHE(inodeId, out);
} else {
out->SetLengthLocked(inode.length());
}

return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR InodeCacheManagerImpl::GetInodeAttr(uint64_t inodeId,
InodeAttr *out) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));
// 1. find in icache
std::shared_ptr<InodeWrapper> inodeWrapper;
bool ok = iCache_->Get(inodeId, &inodeWrapper);
if (ok) {
if (ok && NeedUseCahce(inodeId, inodeWrapper->IsDirty())) {
inodeWrapper->GetInodeAttr(out);
return CURVEFS_ERROR::OK;
}
Expand Down Expand Up @@ -212,14 +178,14 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr(
std::shared_ptr<InodeWrapper> inodeWrapper;
NameLockGuard lock(nameLock_, std::to_string(*iter));
bool ok = iCache_->Get(*iter, &inodeWrapper);
if (ok) {
if (ok && NeedUseCahce(*iter, inodeWrapper->IsDirty())) {
InodeAttr tmpAttr;
inodeWrapper->GetInodeAttr(&tmpAttr);
attrs->emplace_back(tmpAttr);
attrs->emplace_back(std::move(tmpAttr));
iter = inodeIds->erase(iter);
} else {
++iter;
continue;
}
++iter;
}

if (inodeIds->empty()) {
Expand Down Expand Up @@ -249,10 +215,10 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttrAsync(
std::shared_ptr<InodeWrapper> inodeWrapper;
NameLockGuard lock(nameLock_, std::to_string(*iter));
bool ok = iCache_->Get(*iter, &inodeWrapper);
if (ok) {
if (ok && NeedUseCahce(*iter, inodeWrapper->IsDirty())) {
InodeAttr tmpAttr;
inodeWrapper->GetInodeAttr(&tmpAttr);
attrs->emplace(*iter, tmpAttr);
attrs->emplace(*iter, std::move(tmpAttr));
iter = inodeIds->erase(iter);
} else if (cache && cachedAttr.find(*iter) != cachedAttr.end()) {
attrs->emplace(*iter, cachedAttr[*iter]);
Expand Down Expand Up @@ -306,7 +272,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetXAttr(
std::shared_ptr<InodeWrapper> inodeWrapper;
NameLockGuard lock(nameLock_, std::to_string(*iter));
bool ok = iCache_->Get(*iter, &inodeWrapper);
if (ok) {
if (ok && NeedUseCahce(*iter, inodeWrapper->IsDirty())) {
xattr->emplace_back(inodeWrapper->GetXattr());
iter = inodeIds->erase(iter);
} else {
Expand Down Expand Up @@ -524,5 +490,33 @@ InodeCacheManagerImpl::RefreshData(std::shared_ptr<InodeWrapper> &inode,
return rc;
}

void InodeCacheManagerImpl::AddOpenedInode(uint64_t inodeId) {
VLOG(1) << "AddOpenedInode inodeId: " << inodeId;
curve::common::LockGuard lg(openInodesMutex_);
openedInodes_.emplace(inodeId);
}

void InodeCacheManagerImpl::RemoveOpenedInode(uint64_t inodeId) {
VLOG(1) << "RemoveOpenedInode inodeId: " << inodeId;
curve::common::LockGuard lg(openInodesMutex_);
auto iter = openedInodes_.find(inodeId);
if (iter != openedInodes_.end()) {
openedInodes_.erase(iter);
}
}

bool InodeCacheManagerImpl::OpenInodeCached(uint64_t inodeId) {
curve::common::LockGuard lg(openInodesMutex_);
auto iter = openedInodes_.find(inodeId);
return iter != openedInodes_.end();
}

bool InodeCacheManagerImpl::NeedUseCahce(uint64_t inodeId, bool IsDirty) {
if (!FLAGS_enableCto || OpenInodeCached(inodeId) || IsDirty) {
return true;
}
return false;
}

} // namespace client
} // namespace curvefs
19 changes: 15 additions & 4 deletions curvefs/src/client/inode_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ class InodeCacheManager {
GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) = 0; // NOLINT

virtual CURVEFS_ERROR RefreshInode(uint64_t inodeId) = 0;

virtual CURVEFS_ERROR GetInodeAttr(uint64_t inodeId, InodeAttr *out) = 0;

virtual CURVEFS_ERROR BatchGetInodeAttr(
Expand Down Expand Up @@ -173,6 +171,10 @@ class InodeCacheManager {

virtual void ReleaseCache(uint64_t parentId) = 0;

virtual void AddOpenedInode(uint64_t inodeId) = 0;

virtual void RemoveOpenedInode(uint64_t inodeId) = 0;

protected:
uint32_t fsId_;
};
Expand Down Expand Up @@ -233,8 +235,6 @@ class InodeCacheManagerImpl : public InodeCacheManager,
CURVEFS_ERROR GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) override;

CURVEFS_ERROR RefreshInode(uint64_t inodeId) override;

CURVEFS_ERROR GetInodeAttr(uint64_t inodeId, InodeAttr *out) override;

CURVEFS_ERROR BatchGetInodeAttr(std::set<uint64_t> *inodeIds,
Expand Down Expand Up @@ -268,11 +268,18 @@ class InodeCacheManagerImpl : public InodeCacheManager,

void RemoveICache(const std::shared_ptr<InodeWrapper> &inode);

void AddOpenedInode(uint64_t inodeId) override;

void RemoveOpenedInode(uint64_t inodeId) override;

bool NeedUseCahce(uint64_t inodeId, bool IsDirty);

private:
virtual void FlushInodeBackground();
void TrimIcache(uint64_t trimSize);
CURVEFS_ERROR RefreshData(std::shared_ptr<InodeWrapper> &inode, // NOLINT
bool streaming = true);
bool OpenInodeCached(uint64_t inodeId);

private:
std::shared_ptr<MetaServerClient> metaClient_;
Expand All @@ -285,6 +292,10 @@ class InodeCacheManagerImpl : public InodeCacheManager,
std::map<uint64_t, std::shared_ptr<InodeWrapper>> dirtyMap_;
curve::common::Mutex dirtyMapMutex_;

// record opened inode
std::multiset<uint64_t> openedInodes_;
curve::common::Mutex openInodesMutex_;

curve::common::GenericNameLock<Mutex> nameLock_;

curve::common::GenericNameLock<Mutex> asyncNameLock_;
Expand Down
6 changes: 4 additions & 2 deletions curvefs/test/client/mock_inode_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ class MockInodeCacheManager : public InodeCacheManager {
MOCK_METHOD2(GetInodeAttr, CURVEFS_ERROR(
uint64_t inodeId, InodeAttr *out));

MOCK_METHOD1(RefreshInode, CURVEFS_ERROR(uint64_t inodeId));

MOCK_METHOD2(BatchGetInodeAttr, CURVEFS_ERROR(
std::set<uint64_t> *inodeIds, std::list<InodeAttr> *attrs));

Expand Down Expand Up @@ -88,6 +86,10 @@ class MockInodeCacheManager : public InodeCacheManager {
MOCK_METHOD0(FlushInodeOnce, void());

MOCK_METHOD1(ReleaseCache, void(uint64_t parentId));

MOCK_METHOD1(AddOpenedInode, void(uint64_t inodeId));

MOCK_METHOD1(RemoveOpenedInode, void(uint64_t inodeId));
};

} // namespace client
Expand Down
10 changes: 0 additions & 10 deletions curvefs/test/client/test_fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1357,21 +1357,11 @@ TEST_F(TestFuseVolumeClient, FuseOpGetAttrEnableCto) {
inode.set_inodeid(ino);
inode.set_length(0);

// RefreshInode OK
EXPECT_CALL(*inodeManager_, RefreshInode(ino))
.WillOnce(Return(CURVEFS_ERROR::OK));
EXPECT_CALL(*inodeManager_, GetInodeAttr(ino, _))
.WillOnce(DoAll(SetArgPointee<1>(inode), Return(CURVEFS_ERROR::OK)));

ASSERT_EQ(CURVEFS_ERROR::OK, client_->FuseOpGetAttr(req, ino, &fi, &attr));

// RefreshInode Fail
EXPECT_CALL(*inodeManager_, RefreshInode(ino))
.WillOnce(Return(CURVEFS_ERROR::INTERNAL));

ASSERT_EQ(CURVEFS_ERROR::INTERNAL,
client_->FuseOpGetAttr(req, ino, &fi, &attr));

// need not refresh inode
fi.fh = static_cast<uint64_t>(FileHandle::kKeepCache);
EXPECT_CALL(*inodeManager_, GetInodeAttr(ino, _))
Expand Down
Loading