Skip to content

Commit

Permalink
curve-fuse: update attr and extent in single rpc (#1779)
Browse files Browse the repository at this point in the history
Signed-off-by: Hanqing Wu <[email protected]>
  • Loading branch information
wu-hanqing committed Aug 4, 2022
1 parent d90d4b2 commit 25081fc
Show file tree
Hide file tree
Showing 15 changed files with 381 additions and 117 deletions.
1 change: 1 addition & 0 deletions curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ message UpdateInodeRequest {
map<string, string> xattr = 20;
repeated uint64 parent = 21;
map<uint64, S3ChunkInfoList> s3ChunkInfoAdd = 22;
optional VolumeExtentList extents = 23;
}

message UpdateInodeResponse {
Expand Down
34 changes: 33 additions & 1 deletion curvefs/src/client/async_request_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <memory>
#include <mutex>

#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/client/error_code.h"
#include "curvefs/src/client/inode_wrapper.h"

Expand All @@ -43,6 +44,12 @@ AsyncRequestClosureBase::~AsyncRequestClosureBase() = default;

} // namespace internal

namespace {
bool IsOK(MetaStatusCode code) {
return code == MetaStatusCode::OK || code == MetaStatusCode::NOT_FOUND;
}
} // namespace

UpdateVolumeExtentClosure::UpdateVolumeExtentClosure(
const std::shared_ptr<InodeWrapper>& inode,
bool sync)
Expand All @@ -61,7 +68,7 @@ CURVEFS_ERROR UpdateVolumeExtentClosure::Wait() {

void UpdateVolumeExtentClosure::Run() {
auto st = GetStatusCode();
if (st != MetaStatusCode::OK && st != MetaStatusCode::NOT_FOUND) {
if (!IsOK(st)) {
LOG(ERROR) << "UpdateVolumeExtent failed, error: "
<< MetaStatusCode_Name(st)
<< ", inodeid: " << inode_->GetInodeId();
Expand All @@ -79,5 +86,30 @@ void UpdateVolumeExtentClosure::Run() {
}
}

UpdateInodeAttrAndExtentClosure::UpdateInodeAttrAndExtentClosure(
const std::shared_ptr<InodeWrapper>& inode,
MetaServerClientDone* parent)
: Base(inode), parent_(parent) {}

void UpdateInodeAttrAndExtentClosure::Run() {
std::unique_ptr<UpdateInodeAttrAndExtentClosure> guard(this);

auto st = GetStatusCode();
if (!IsOK(st)) {
LOG(ERROR) << "UpdateInodeAttrAndExtent failed, error: "
<< MetaStatusCode_Name(st)
<< ", inode: " << inode_->GetInodeId();
inode_->MarkInodeError();
}

inode_->syncingVolumeExtentsMtx_.unlock();
inode_->ReleaseSyncingInode();

if (parent_ != nullptr) {
parent_->SetMetaStatusCode(st);
parent_->Run();
}
}

} // namespace client
} // namespace curvefs
14 changes: 14 additions & 0 deletions curvefs/src/client/async_request_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ class UpdateVolumeExtentClosure : public internal::AsyncRequestClosureBase {
bthread::ConditionVariable cond_;
};

class UpdateInodeAttrAndExtentClosure
: public internal::AsyncRequestClosureBase {
public:
using Base = AsyncRequestClosureBase;

UpdateInodeAttrAndExtentClosure(const std::shared_ptr<InodeWrapper>& inode,
MetaServerClientDone* parent);

void Run() override;

private:
MetaServerClientDone* parent_;
};

} // namespace client
} // namespace curvefs

Expand Down
66 changes: 25 additions & 41 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <utility>
#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/client/error_code.h"
#include "curvefs/src/client/inode_wrapper.h"

using ::curvefs::metaserver::Inode;
using ::curvefs::metaserver::MetaStatusCode_Name;
Expand All @@ -47,43 +48,12 @@ namespace client {

using NameLockGuard = ::curve::common::GenericNameLockGuard<Mutex>;

bool IsNotDirtyInode(const std::shared_ptr<InodeWrapper> &inode) {
return !inode->IsDirty() && inode->S3ChunkInfoEmpty();
}

class UpdataInodeAsyncS3Done : public MetaServerClientDone {
public:
explicit UpdataInodeAsyncS3Done(
const std::shared_ptr<InodeWrapper> &inodeWrapper)
: inodeWrapper_(inodeWrapper) {}
~UpdataInodeAsyncS3Done() {}

void Run() override {
std::unique_ptr<UpdataInodeAsyncS3Done> self_guard(this);
MetaStatusCode ret = GetStatusCode();
if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
LOG(ERROR) << "metaClient_ UpdateInode failed, "
<< "MetaStatusCode: " << ret
<< ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret)
<< ", inodeid: " << inodeWrapper_->GetInodeId();
inodeWrapper_->MarkInodeError();
}
VLOG(9) << "inode " << inodeWrapper_->GetInodeId() << " async success.";
inodeWrapper_->ReleaseSyncingInode();
inodeWrapper_->ReleaseSyncingS3ChunkInfo();
};

private:
std::shared_ptr<InodeWrapper> inodeWrapper_;
};

class TrimICacheAsyncDone : public MetaServerClientDone {
public:
explicit TrimICacheAsyncDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper,
const std::shared_ptr<InodeCacheManagerImpl> &inodeCacheManager)
: inodeWrapper_(inodeWrapper), inodeCacheManager_(inodeCacheManager) {}
~TrimICacheAsyncDone() {}

void Run() override {
std::unique_ptr<TrimICacheAsyncDone> self_guard(this);
Expand All @@ -95,10 +65,8 @@ class TrimICacheAsyncDone : public MetaServerClientDone {
<< ", inodeid: " << inodeWrapper_->GetInodeId();
inodeWrapper_->MarkInodeError();
}
VLOG(9) << "trime inode " << inodeWrapper_->GetInodeId()
VLOG(9) << "Trim inode " << inodeWrapper_->GetInodeId()
<< " async success.";
inodeWrapper_->ReleaseSyncingInode();
inodeWrapper_->ReleaseSyncingS3ChunkInfo();
inodeCacheManager_->RemoveICache(inodeWrapper_);
};

Expand Down Expand Up @@ -417,8 +385,7 @@ void InodeCacheManagerImpl::FlushInodeOnce() {
}
for (auto it = temp_.begin(); it != temp_.end(); it++) {
curve::common::UniqueLock ulk = it->second->GetUniqueLock();
auto *done = new UpdataInodeAsyncS3Done(it->second);
it->second->Async(done, true);
it->second->Async(nullptr, true);
}
}

Expand All @@ -439,28 +406,45 @@ void InodeCacheManagerImpl::FlushInodeBackground() {
LOG(INFO) << "flush thread is stop.";
}

namespace {
// Wether an inode is dirty or not.
// if |needLock| is true, we acquire the lock firstly and do the test,
// otherwise, we assume the lock is already held.
bool IsDirtyInode(InodeWrapper *ino, bool needLock) {
auto check = [ino]() {
return ino->IsDirty() || !ino->S3ChunkInfoEmptyNolock() ||
ino->GetMutableExtentCacheLocked()->HasDirtyExtents();
};

if (needLock) {
auto lk = ino->GetUniqueLock();
return check();
}

return check();
}
} // namespace

void InodeCacheManagerImpl::RemoveICache(
const std::shared_ptr<InodeWrapper> &inode) {
if (!inode->IsDirty() && inode->S3ChunkInfoEmpty()) {
if (!IsDirtyInode(inode.get(), true)) {
uint64_t inodeId = inode->GetInodeId();
NameLockGuard lock(nameLock_, std::to_string(inodeId));
::curve::common::UniqueLock lgGuard = inode->GetUniqueLock();
VLOG(9) << "RemoveICache remove inode " << inodeId << " from iCache";
iCache_->Remove(inodeId);
}
}

void InodeCacheManagerImpl::TrimIcache(uint64_t trimSize) {
std::shared_ptr<InodeWrapper> inodeWrapper;
uint64_t inodeId;
VLOG(9) << "TrimIcache trimSize " << trimSize;
VLOG(3) << "TrimIcache trimSize " << trimSize;
while (trimSize > 0) {
bool ok = iCache_->GetLast(&inodeId, &inodeWrapper);
if (ok) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
if (inodeWrapper->IsDirty() ||
!inodeWrapper->S3ChunkInfoEmptyNolock()) {
if (IsDirtyInode(inodeWrapper.get(), false)) {
VLOG(9) << "TrimIcache sync dirty inode " << inodeId;
dirtyMapMutex_.lock();
dirtyMap_.erase(inodeId);
Expand Down
Loading

0 comments on commit 25081fc

Please sign in to comment.