Skip to content

Commit

Permalink
curvefs/client: fix RefreshInode do not refresh when inode exist
Browse files Browse the repository at this point in the history
Signed-off-by: ilixiaocui <[email protected]>
  • Loading branch information
ilixiaocui committed Aug 11, 2022
1 parent adeb908 commit 22d39c0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 80 deletions.
30 changes: 22 additions & 8 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ class TrimICacheAsyncDone : public MetaServerClientDone {
std::shared_ptr<InodeCacheManagerImpl> inodeCacheManager_;
};

class AsyncS3ChunkInfoDone : public MetaServerClientDone {
public:
explicit AsyncS3ChunkInfoDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper)
: inodeWrapper_(inodeWrapper) {}

void Run() override {
std::unique_ptr<AsyncDonAsyncDone> self_guard(this);
inodeWrapper_->NotifyInflightAsyncS3ChunkInfoBack();
};

private:
std::shared_ptr<InodeWrapper> inodeWrapper_;
};

#define GET_INODE_REMOTE(FSID, INODEID, OUT, STREAMING) \
MetaStatusCode ret = metaClient_->GetInode(FSID, INODEID, OUT, STREAMING); \
if (ret != MetaStatusCode::OK) { \
Expand Down Expand Up @@ -153,6 +168,7 @@ InodeCacheManagerImpl::RefreshInode(uint64_t inodeId) {
option_.maxDataSize, option_.refreshDataIntervalSec);
} else {
lgGuard = out->GetUniqueLock();
streaming = true;
}

// refresh data
Expand Down Expand Up @@ -258,10 +274,9 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttrAsync(
std::make_shared<CountDownEvent>(inodeGroups.size());
for (const auto& it : inodeGroups) {
VLOG(3) << "BatchGetInodeAttrAsync Send " << it.size();
auto* done = new BatchGetInodeAttrAsyncDone(shared_from_this(),
cond, parentId);
MetaStatusCode ret = metaClient_->BatchGetInodeAttrAsync(fsId_, it,
done);
MetaStatusCode ret = metaClient_->BatchGetInodeAttrAsync(
fsId_, it,
new BatchGetInodeAttrAsyncDone(shared_from_this(), cond, parentId));
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetInodeAsync failed,"
<< " MetaStatusCode = " << ret
Expand Down Expand Up @@ -389,7 +404,7 @@ void InodeCacheManagerImpl::FlushInodeOnce() {
}
for (auto it = temp_.begin(); it != temp_.end(); it++) {
curve::common::UniqueLock ulk = it->second->GetUniqueLock();
it->second->Async(nullptr, true);
it->second->Async(new AsyncS3ChunkInfoDone(it->second), true);
}
}

Expand Down Expand Up @@ -453,9 +468,8 @@ void InodeCacheManagerImpl::TrimIcache(uint64_t trimSize) {
dirtyMapMutex_.lock();
dirtyMap_.erase(inodeId);
dirtyMapMutex_.unlock();
auto *done =
new TrimICacheAsyncDone(inodeWrapper, shared_from_this());
inodeWrapper->Async(done);
inodeWrapper->Async(
TrimICacheAsyncDone(inodeWrapper, shared_from_this()));
} else {
VLOG(9) << "TrimIcache remove inode " << inodeId
<< " from iCache";
Expand Down
35 changes: 5 additions & 30 deletions curvefs/src/client/inode_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,6 @@ CURVEFS_ERROR InodeWrapper::SyncAttr(bool internal) {
return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR InodeWrapper::SyncS3ChunkInfo(bool internal) {
curve::common::UniqueLock lock = GetSyncingS3ChunkInfoUniqueLock();
if (!s3ChunkInfoAdd_.empty()) {
MetaStatusCode ret = metaClient_->GetOrModifyS3ChunkInfo(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_, false, nullptr,
internal);
if (ret != MetaStatusCode::OK) {
LOG(ERROR) << "metaClient_ GetOrModifyS3ChunkInfo failed, "
<< "MetaStatusCode: " << ret
<< ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret)
<< ", inodeid: " << inode_.inodeid();
return MetaStatusCodeToCurvefsErrCode(ret);
}
ClearS3ChunkInfoAdd();
}
return CURVEFS_ERROR::OK;
}

void InodeWrapper::AsyncFlushAttr(MetaServerClientDone* done,
bool /*internal*/) {
if (dirty_) {
Expand All @@ -188,17 +170,6 @@ void InodeWrapper::AsyncFlushAttr(MetaServerClientDone* done,
}
}

void InodeWrapper::FlushS3ChunkInfoAsync() {
if (!s3ChunkInfoAdd_.empty()) {
LockSyncingS3ChunkInfo();
auto *done = new GetOrModifyS3ChunkInfoAsyncDone(shared_from_this());
metaClient_->GetOrModifyS3ChunkInfoAsync(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_,
done);
ClearS3ChunkInfoAdd();
}
}

CURVEFS_ERROR InodeWrapper::FlushVolumeExtent() {
std::lock_guard<::curve::common::Mutex> guard(syncingVolumeExtentsMtx_);
if (!extentCache_.HasDirtyExtents()) {
Expand Down Expand Up @@ -481,6 +452,9 @@ CURVEFS_ERROR InodeWrapper::SyncS3(bool internal) {
}
ClearS3ChunkInfoAdd();
}

std::unique_lock<std::mutex> lk(inflightAsyncS3ChunkInfoMtx_);
cond_.wait(lk, [] { return inflightAsyncS3ChunkInfo_ == 0; });
return CURVEFS_ERROR::OK;
}

Expand Down Expand Up @@ -524,13 +498,14 @@ void InodeWrapper::AsyncS3(MetaServerClientDone *done, bool internal) {
LockSyncingS3ChunkInfo();
DataIndices indices;
if (!s3ChunkInfoAdd_.empty()) {
indices.s3ChunkInfoMap = std::move(s3ChunkInfoAdd_);
indices.s3ChunkInfoMap = s3ChunkInfoAdd_;
}
metaClient_->UpdateInodeWithOutNlinkAsync(
inode_, new UpdateInodeAsyncS3Done{shared_from_this(), done},
InodeOpenStatusChange::NOCHANGE, std::move(indices));
dirty_ = false;
ClearS3ChunkInfoAdd();
inflightAsyncS3ChunkInfo_.fetch_add(1);
return;
}

Expand Down
79 changes: 37 additions & 42 deletions curvefs/src/client/inode_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,47 +81,36 @@ extern bvar::Adder<int64_t> g_alive_inode_count;

class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {
public:
InodeWrapper(const Inode &inode,
const std::shared_ptr<MetaServerClient> &metaClient,
const std::shared_ptr<S3ChunkInfoMetric>
&s3ChunkInfoMetric = nullptr,
uint64_t maxDataSize = ULONG_MAX,
uint32_t refreshDataInterval = UINT_MAX)
: inode_(inode),
status_(InodeStatus::Normal),
isNlinkValid_(true),
metaClient_(metaClient),
s3ChunkInfoMetric_(s3ChunkInfoMetric),
dirty_(false),
baseMaxDataSize_(maxDataSize),
maxDataSize_(maxDataSize),
refreshDataInterval_(refreshDataInterval),
InodeWrapper(
const Inode &inode, const std::shared_ptr<MetaServerClient> &metaClient,
const std::shared_ptr<S3ChunkInfoMetric> &s3ChunkInfoMetric = nullptr,
uint64_t maxDataSize = ULONG_MAX,
uint32_t refreshDataInterval = UINT_MAX)
: inode_(inode), status_(InodeStatus::Normal), isNlinkValid_(true),
metaClient_(metaClient), s3ChunkInfoMetric_(s3ChunkInfoMetric),
dirty_(false), baseMaxDataSize_(maxDataSize),
maxDataSize_(maxDataSize), refreshDataInterval_(refreshDataInterval),
lastRefreshTime_(::curve::common::TimeUtility::GetTimeofDaySec()),
s3ChunkInfoAddSize_(0) {
UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize());
g_alive_inode_count << 1;
}

InodeWrapper(Inode &&inode,
const std::shared_ptr<MetaServerClient> &metaClient,
const std::shared_ptr<S3ChunkInfoMetric>
&s3ChunkInfoMetric = nullptr,
uint64_t maxDataSize = ULONG_MAX,
uint32_t refreshDataInterval = UINT_MAX)
: inode_(std::move(inode)),
status_(InodeStatus::Normal),
isNlinkValid_(true),
metaClient_(metaClient),
s3ChunkInfoMetric_(s3ChunkInfoMetric),
dirty_(false),
baseMaxDataSize_(maxDataSize),
maxDataSize_(maxDataSize),
s3ChunkInfoAddSize_(0), inflightAsyncS3ChunkInfo_(0) {
UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize());
g_alive_inode_count << 1;
}

InodeWrapper(
Inode &&inode, const std::shared_ptr<MetaServerClient> &metaClient,
const std::shared_ptr<S3ChunkInfoMetric> &s3ChunkInfoMetric = nullptr,
uint64_t maxDataSize = ULONG_MAX,
uint32_t refreshDataInterval = UINT_MAX)
: inode_(std::move(inode)), status_(InodeStatus::Normal),
isNlinkValid_(true), metaClient_(metaClient),
s3ChunkInfoMetric_(s3ChunkInfoMetric), dirty_(false),
baseMaxDataSize_(maxDataSize), maxDataSize_(maxDataSize),
refreshDataInterval_(refreshDataInterval),
lastRefreshTime_(::curve::common::TimeUtility::GetTimeofDaySec()),
s3ChunkInfoAddSize_(0) {
UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize());
g_alive_inode_count << 1;
}
s3ChunkInfoAddSize_(0), inflightAsyncS3ChunkInfo_(0) {
UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize());
g_alive_inode_count << 1;
}

~InodeWrapper() {
UpdateS3ChunkInfoMetric(-s3ChunkInfoSize_ - s3ChunkInfoAddSize_);
Expand Down Expand Up @@ -304,8 +293,6 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {

void AsyncFlushAttr(MetaServerClientDone *done, bool internal);

void FlushS3ChunkInfoAsync();

CURVEFS_ERROR RefreshS3ChunkInfo();

CURVEFS_ERROR Open();
Expand Down Expand Up @@ -397,11 +384,15 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {
return false;
}

void NotifyInflightAsyncS3ChunkInfoBack() {
inflightAsyncS3ChunkInfo_.fetch_sub(1);
std::unique_lock<std::mutex> lk(inflightAsyncS3ChunkInfoMtx_);
cond_.notify_one();
}

private:
CURVEFS_ERROR UpdateInodeStatus(InodeOpenStatusChange statusChange);

CURVEFS_ERROR SyncS3ChunkInfo(bool internal = false);

int64_t CalS3ChunkInfoSize() {
int64_t size = 0;
for (const auto &it : inode_.s3chunkinfomap()) {
Expand Down Expand Up @@ -469,6 +460,10 @@ class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {

mutable ::curve::common::Mutex syncingVolumeExtentsMtx_;
ExtentCache extentCache_;

curve::common::Mutex inflightAsyncS3ChunkInfoMtx_;
curve::common::ConditionVariable cond_;
std::atomic<uint64_t> inflightAsyncS3ChunkInfo_;
};

} // namespace client
Expand Down
9 changes: 9 additions & 0 deletions curvefs/test/client/test_inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ TEST_F(TestInodeCacheManager, RefreshInode) {
iCacheManager_->GetInode(inodeId, inodeWrapper));
ASSERT_EQ(inodenew.length(), inodeWrapper->GetLength());

// cache hit, refresh s3-inode from metaserver, need streaming
EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _, _))
.WillOnce(DoAll(SetArgPointee<2>(inodenew), SetArgPointee<3>(true),
Return(MetaStatusCode::OK)));
EXPECT_CALL(*metaClient_,
GetOrModifyS3ChunkInfo(fsId_, inodeId, _, true, _, _))
.WillOnce(Return(MetaStatusCode::OK));
ASSERT_EQ(CURVEFS_ERROR::OK, iCacheManager_->RefreshInode(inodeId));

// cache miss, get file-inode from metaserver
Inode inodefile;
uint64_t inodefileid = 300;
Expand Down

0 comments on commit 22d39c0

Please sign in to comment.