diff --git a/curvefs/proto/metaserver.proto b/curvefs/proto/metaserver.proto index b4e9154823..0680cc0ba2 100644 --- a/curvefs/proto/metaserver.proto +++ b/curvefs/proto/metaserver.proto @@ -294,6 +294,7 @@ message UpdateInodeRequest { map xattr = 20; repeated uint64 parent = 21; map s3ChunkInfoAdd = 22; + optional VolumeExtentList extents = 23; } message UpdateInodeResponse { diff --git a/curvefs/src/client/async_request_closure.cpp b/curvefs/src/client/async_request_closure.cpp index aa07df8622..8845e7872e 100644 --- a/curvefs/src/client/async_request_closure.cpp +++ b/curvefs/src/client/async_request_closure.cpp @@ -27,6 +27,7 @@ #include #include +#include "curvefs/proto/metaserver.pb.h" #include "curvefs/src/client/error_code.h" #include "curvefs/src/client/inode_wrapper.h" @@ -43,6 +44,12 @@ AsyncRequestClosureBase::~AsyncRequestClosureBase() = default; } // namespace internal +namespace { +bool IsOK(MetaStatusCode code) { + return code == MetaStatusCode::OK || code == MetaStatusCode::NOT_FOUND; +} +} // namespace + UpdateVolumeExtentClosure::UpdateVolumeExtentClosure( const std::shared_ptr& inode, bool sync) @@ -61,7 +68,7 @@ CURVEFS_ERROR UpdateVolumeExtentClosure::Wait() { void UpdateVolumeExtentClosure::Run() { auto st = GetStatusCode(); - if (st != MetaStatusCode::OK && st != MetaStatusCode::NOT_FOUND) { + if (!IsOK(st)) { LOG(ERROR) << "UpdateVolumeExtent failed, error: " << MetaStatusCode_Name(st) << ", inodeid: " << inode_->GetInodeId(); @@ -79,5 +86,30 @@ void UpdateVolumeExtentClosure::Run() { } } +UpdateInodeAttrAndExtentClosure::UpdateInodeAttrAndExtentClosure( + const std::shared_ptr& inode, + MetaServerClientDone* parent) + : Base(inode), parent_(parent) {} + +void UpdateInodeAttrAndExtentClosure::Run() { + std::unique_ptr guard(this); + + auto st = GetStatusCode(); + if (!IsOK(st)) { + LOG(ERROR) << "UpdateInodeAttrAndExtent failed, error: " + << MetaStatusCode_Name(st) + << ", inode: " << inode_->GetInodeId(); + inode_->MarkInodeError(); + } + + inode_->syncingVolumeExtentsMtx_.unlock(); + inode_->ReleaseSyncingInode(); + + if (parent_ != nullptr) { + parent_->SetMetaStatusCode(st); + parent_->Run(); + } +} + } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/async_request_closure.h b/curvefs/src/client/async_request_closure.h index 6756f09973..0e75cc78e2 100644 --- a/curvefs/src/client/async_request_closure.h +++ b/curvefs/src/client/async_request_closure.h @@ -84,6 +84,20 @@ class UpdateVolumeExtentClosure : public internal::AsyncRequestClosureBase { bthread::ConditionVariable cond_; }; +class UpdateInodeAttrAndExtentClosure + : public internal::AsyncRequestClosureBase { + public: + using Base = AsyncRequestClosureBase; + + UpdateInodeAttrAndExtentClosure(const std::shared_ptr& inode, + MetaServerClientDone* parent); + + void Run() override; + + private: + MetaServerClientDone* parent_; +}; + } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/inode_cache_manager.cpp b/curvefs/src/client/inode_cache_manager.cpp index 5dbe39df62..441ac0f4a3 100644 --- a/curvefs/src/client/inode_cache_manager.cpp +++ b/curvefs/src/client/inode_cache_manager.cpp @@ -30,6 +30,7 @@ #include #include "curvefs/proto/metaserver.pb.h" #include "curvefs/src/client/error_code.h" +#include "curvefs/src/client/inode_wrapper.h" using ::curvefs::metaserver::Inode; using ::curvefs::metaserver::MetaStatusCode_Name; @@ -47,43 +48,12 @@ namespace client { using NameLockGuard = ::curve::common::GenericNameLockGuard; -bool IsNotDirtyInode(const std::shared_ptr &inode) { - return !inode->IsDirty() && inode->S3ChunkInfoEmpty(); -} - -class UpdataInodeAsyncS3Done : public MetaServerClientDone { - public: - explicit UpdataInodeAsyncS3Done( - const std::shared_ptr &inodeWrapper) - : inodeWrapper_(inodeWrapper) {} - ~UpdataInodeAsyncS3Done() {} - - void Run() override { - std::unique_ptr self_guard(this); - MetaStatusCode ret = GetStatusCode(); - if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) { - LOG(ERROR) << "metaClient_ UpdateInode failed, " - << "MetaStatusCode: " << ret - << ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret) - << ", inodeid: " << inodeWrapper_->GetInodeId(); - inodeWrapper_->MarkInodeError(); - } - VLOG(9) << "inode " << inodeWrapper_->GetInodeId() << " async success."; - inodeWrapper_->ReleaseSyncingInode(); - inodeWrapper_->ReleaseSyncingS3ChunkInfo(); - }; - - private: - std::shared_ptr inodeWrapper_; -}; - class TrimICacheAsyncDone : public MetaServerClientDone { public: explicit TrimICacheAsyncDone( const std::shared_ptr &inodeWrapper, const std::shared_ptr &inodeCacheManager) : inodeWrapper_(inodeWrapper), inodeCacheManager_(inodeCacheManager) {} - ~TrimICacheAsyncDone() {} void Run() override { std::unique_ptr self_guard(this); @@ -95,10 +65,8 @@ class TrimICacheAsyncDone : public MetaServerClientDone { << ", inodeid: " << inodeWrapper_->GetInodeId(); inodeWrapper_->MarkInodeError(); } - VLOG(9) << "trime inode " << inodeWrapper_->GetInodeId() + VLOG(9) << "Trim inode " << inodeWrapper_->GetInodeId() << " async success."; - inodeWrapper_->ReleaseSyncingInode(); - inodeWrapper_->ReleaseSyncingS3ChunkInfo(); inodeCacheManager_->RemoveICache(inodeWrapper_); }; @@ -417,8 +385,7 @@ void InodeCacheManagerImpl::FlushInodeOnce() { } for (auto it = temp_.begin(); it != temp_.end(); it++) { curve::common::UniqueLock ulk = it->second->GetUniqueLock(); - auto *done = new UpdataInodeAsyncS3Done(it->second); - it->second->Async(done, true); + it->second->Async(nullptr, true); } } @@ -439,13 +406,31 @@ void InodeCacheManagerImpl::FlushInodeBackground() { LOG(INFO) << "flush thread is stop."; } +namespace { +// Wether an inode is dirty or not. +// if |needLock| is true, we acquire the lock firstly and do the test, +// otherwise, we assume the lock is already held. +bool IsDirtyInode(InodeWrapper *ino, bool needLock) { + auto check = [ino]() { + return ino->IsDirty() || !ino->S3ChunkInfoEmptyNolock() || + ino->GetMutableExtentCacheLocked()->HasDirtyExtents(); + }; + + if (needLock) { + auto lk = ino->GetUniqueLock(); + return check(); + } + + return check(); +} +} // namespace + void InodeCacheManagerImpl::RemoveICache( const std::shared_ptr &inode) { - if (!inode->IsDirty() && inode->S3ChunkInfoEmpty()) { + if (!IsDirtyInode(inode.get(), true)) { uint64_t inodeId = inode->GetInodeId(); NameLockGuard lock(nameLock_, std::to_string(inodeId)); ::curve::common::UniqueLock lgGuard = inode->GetUniqueLock(); - VLOG(9) << "RemoveICache remove inode " << inodeId << " from iCache"; iCache_->Remove(inodeId); } } @@ -453,14 +438,13 @@ void InodeCacheManagerImpl::RemoveICache( void InodeCacheManagerImpl::TrimIcache(uint64_t trimSize) { std::shared_ptr inodeWrapper; uint64_t inodeId; - VLOG(9) << "TrimIcache trimSize " << trimSize; + VLOG(3) << "TrimIcache trimSize " << trimSize; while (trimSize > 0) { bool ok = iCache_->GetLast(&inodeId, &inodeWrapper); if (ok) { NameLockGuard lock(nameLock_, std::to_string(inodeId)); ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - if (inodeWrapper->IsDirty() || - !inodeWrapper->S3ChunkInfoEmptyNolock()) { + if (IsDirtyInode(inodeWrapper.get(), false)) { VLOG(9) << "TrimIcache sync dirty inode " << inodeId; dirtyMapMutex_.lock(); dirtyMap_.erase(inodeId); diff --git a/curvefs/src/client/inode_wrapper.cpp b/curvefs/src/client/inode_wrapper.cpp index e5b09fc254..7b9d71d8e5 100644 --- a/curvefs/src/client/inode_wrapper.cpp +++ b/curvefs/src/client/inode_wrapper.cpp @@ -24,6 +24,7 @@ #include +#include #include #include #include @@ -34,6 +35,8 @@ #include "curvefs/src/client/error_code.h" #include "curvefs/src/client/rpcclient/metaserver_client.h" #include "curvefs/src/client/rpcclient/task_excutor.h" +#include "curvefs/src/client/xattr_manager.h" +#include "include/curve_compiler_specific.h" using ::curvefs::metaserver::MetaStatusCode_Name; @@ -42,6 +45,9 @@ namespace client { using rpcclient::MetaServerClient; using rpcclient::MetaServerClientImpl; +using rpcclient::DataIndices; + +bvar::Adder g_alive_inode_count{"alive_inode_count"}; std::ostream &operator<<(std::ostream &os, const struct stat &attr) { os << "{ st_ino = " << attr.st_ino << ", st_mode = " << attr.st_mode @@ -60,7 +66,7 @@ std::ostream &operator<<(std::ostream &os, const struct stat &attr) { void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info, google::protobuf::Map *s3ChunkInfoMap) { VLOG(9) << "AppendS3ChunkInfoToMap chunkIndex: " << chunkIndex - << "s3chunkInfo { chunkId: " << info.chunkid() + << ", s3chunkInfo { chunkId: " << info.chunkid() << ", compaction: " << info.compaction() << ", offset: " << info.offset() << ", len: " << info.len() << ", zero: " << info.zero(); @@ -78,10 +84,9 @@ void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info, class UpdateInodeAsyncDone : public MetaServerClientDone { public: - explicit UpdateInodeAsyncDone( - const std::shared_ptr &inodeWrapper): - inodeWrapper_(inodeWrapper) {} - ~UpdateInodeAsyncDone() {} + UpdateInodeAsyncDone(const std::shared_ptr& inodeWrapper, + MetaServerClientDone* parent) + : inodeWrapper_(inodeWrapper), parent_(parent) {} void Run() override { std::unique_ptr self_guard(this); @@ -94,10 +99,16 @@ class UpdateInodeAsyncDone : public MetaServerClientDone { inodeWrapper_->MarkInodeError(); } inodeWrapper_->ReleaseSyncingInode(); + + if (parent_ != nullptr) { + parent_->SetMetaStatusCode(ret); + parent_->Run(); + } }; private: std::shared_ptr inodeWrapper_; + MetaServerClientDone* parent_; }; class GetOrModifyS3ChunkInfoAsyncDone : public MetaServerClientDone { @@ -161,12 +172,19 @@ CURVEFS_ERROR InodeWrapper::SyncS3ChunkInfo(bool internal) { return CURVEFS_ERROR::OK; } -void InodeWrapper::FlushAttrAsync() { +void InodeWrapper::AsyncFlushAttr(MetaServerClientDone* done, + bool /*internal*/) { if (dirty_) { LockSyncingInode(); - auto *done = new UpdateInodeAsyncDone(shared_from_this()); - metaClient_->UpdateInodeWithOutNlinkAsync(inode_, done); + metaClient_->UpdateInodeWithOutNlinkAsync( + inode_, new UpdateInodeAsyncDone(shared_from_this(), done)); dirty_ = false; + return; + } + + if (done != nullptr) { + done->SetMetaStatusCode(MetaStatusCode::OK); + done->Run(); } } @@ -189,7 +207,7 @@ CURVEFS_ERROR InodeWrapper::FlushVolumeExtent() { UpdateVolumeExtentClosure closure(shared_from_this(), true); auto dirtyExtents = extentCache_.GetDirtyExtents(); - VLOG(9) << "FlushVolumeExtent, ino: " << inode_.inodeid() + VLOG(3) << "FlushVolumeExtent, ino: " << inode_.inodeid() << ", dirty extents: " << dirtyExtents.ShortDebugString(); CHECK_GT(dirtyExtents.slices_size(), 0); metaClient_->AsyncUpdateVolumeExtent(inode_.fsid(), inode_.inodeid(), @@ -200,12 +218,14 @@ CURVEFS_ERROR InodeWrapper::FlushVolumeExtent() { void InodeWrapper::FlushVolumeExtentAsync() { syncingVolumeExtentsMtx_.lock(); if (!extentCache_.HasDirtyExtents()) { + VLOG(3) << "FlushVolumeExtentAsync, ino: " << inode_.inodeid() + << ", no dirty extents"; syncingVolumeExtentsMtx_.unlock(); return; } auto dirtyExtents = extentCache_.GetDirtyExtents(); - VLOG(9) << "FlushVolumeExtent, ino: " << inode_.inodeid() + VLOG(3) << "FlushVolumeExtent, ino: " << inode_.inodeid() << ", dirty extents: " << dirtyExtents.ShortDebugString(); CHECK_GT(dirtyExtents.slices_size(), 0); auto *closure = new UpdateVolumeExtentClosure(shared_from_this(), false); @@ -398,18 +418,46 @@ CURVEFS_ERROR InodeWrapper::Sync(bool internal) { } void InodeWrapper::Async(MetaServerClientDone *done, bool internal) { + VLOG(3) << "async inode: " << inode_.ShortDebugString(); + switch (inode_.type()) { case FsFileType::TYPE_S3: - AsyncS3(done, internal); - break; + return AsyncS3(done, internal); case FsFileType::TYPE_FILE: - FlushAttrAsync(); - FlushVolumeExtentAsync(); - break; + return AsyncFlushAttrAndExtents(done, internal); case FsFileType::TYPE_DIRECTORY: - FlushAttrAsync(); - default: - break; + FALLTHROUGH_INTENDED; + case FsFileType::TYPE_SYM_LINK: + return AsyncFlushAttr(done, internal); + } + + CHECK(false) << "Unexpected inode type: " << inode_.type() << ", " + << inode_.ShortDebugString(); +} + +void InodeWrapper::AsyncFlushAttrAndExtents(MetaServerClientDone *done, + bool /*internal*/) { + if (dirty_ || extentCache_.HasDirtyExtents()) { + LockSyncingInode(); + syncingVolumeExtentsMtx_.lock(); + DataIndices indices; + if (extentCache_.HasDirtyExtents()) { + indices.volumeExtents = extentCache_.GetDirtyExtents(); + } + + metaClient_->UpdateInodeWithOutNlinkAsync( + inode_, + new UpdateInodeAttrAndExtentClosure{shared_from_this(), done}, + InodeOpenStatusChange::NOCHANGE, std::move(indices)); + + dirty_ = false; + return; + } + + // nothing to update + if (done != nullptr) { + done->SetMetaStatusCode(MetaStatusCode::OK); + done->Run(); } } @@ -436,29 +484,59 @@ CURVEFS_ERROR InodeWrapper::SyncS3(bool internal) { return CURVEFS_ERROR::OK; } +namespace { +class UpdateInodeAsyncS3Done : public MetaServerClientDone { + public: + explicit UpdateInodeAsyncS3Done( + const std::shared_ptr& inodeWrapper, + MetaServerClientDone* parent) + : inodeWrapper_(inodeWrapper), parent_(parent) {} + + void Run() override { + std::unique_ptr self_guard(this); + MetaStatusCode ret = GetStatusCode(); + if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) { + LOG(ERROR) << "metaClient_ UpdateInode failed, " + << "MetaStatusCode: " << ret + << ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret) + << ", inodeid: " << inodeWrapper_->GetInodeId(); + inodeWrapper_->MarkInodeError(); + } + VLOG(9) << "inode " << inodeWrapper_->GetInodeId() << " async success."; + inodeWrapper_->ReleaseSyncingInode(); + inodeWrapper_->ReleaseSyncingS3ChunkInfo(); + + if (parent_ != nullptr) { + parent_->SetMetaStatusCode(ret); + parent_->Run(); + } + }; + + private: + std::shared_ptr inodeWrapper_; + MetaServerClientDone* parent_; +}; +} // namespace + void InodeWrapper::AsyncS3(MetaServerClientDone *done, bool internal) { if (dirty_ || !s3ChunkInfoAdd_.empty()) { LockSyncingInode(); LockSyncingS3ChunkInfo(); + DataIndices indices; + if (!s3ChunkInfoAdd_.empty()) { + indices.s3ChunkInfoMap = std::move(s3ChunkInfoAdd_); + } metaClient_->UpdateInodeWithOutNlinkAsync( - inode_, done, InodeOpenStatusChange::NOCHANGE, &s3ChunkInfoAdd_); + inode_, new UpdateInodeAsyncS3Done{shared_from_this(), done}, + InodeOpenStatusChange::NOCHANGE, std::move(indices)); dirty_ = false; ClearS3ChunkInfoAdd(); + return; } -} - -void InodeWrapper::FlushAsync() { - // TODO(all): maybe we should update inode attribute and data indices - // in single rpc - FlushAttrAsync(); - switch (inode_.type()) { - case FsFileType::TYPE_S3: - return FlushS3ChunkInfoAsync(); - case FsFileType::TYPE_FILE: - return FlushVolumeExtentAsync(); - default: - break; + if (done != nullptr) { + done->SetMetaStatusCode(MetaStatusCode::OK); + done->Run(); } } diff --git a/curvefs/src/client/inode_wrapper.h b/curvefs/src/client/inode_wrapper.h index 29d7ebbdd7..49aa50364b 100644 --- a/curvefs/src/client/inode_wrapper.h +++ b/curvefs/src/client/inode_wrapper.h @@ -56,7 +56,7 @@ do { \ return ret; \ } \ } \ -} while (0) \ +} while (0) using ::curvefs::metaserver::VolumeExtentList; @@ -77,6 +77,8 @@ std::ostream &operator<<(std::ostream &os, const struct stat &attr); void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info, google::protobuf::Map *s3ChunkInfoMap); +extern bvar::Adder g_alive_inode_count; + class InodeWrapper : public std::enable_shared_from_this { public: InodeWrapper(const Inode &inode, @@ -97,6 +99,7 @@ class InodeWrapper : public std::enable_shared_from_this { lastRefreshTime_(::curve::common::TimeUtility::GetTimeofDaySec()), s3ChunkInfoAddSize_(0) { UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize()); + g_alive_inode_count << 1; } InodeWrapper(Inode &&inode, @@ -117,10 +120,12 @@ class InodeWrapper : public std::enable_shared_from_this { lastRefreshTime_(::curve::common::TimeUtility::GetTimeofDaySec()), s3ChunkInfoAddSize_(0) { UpdateS3ChunkInfoMetric(CalS3ChunkInfoSize()); + g_alive_inode_count << 1; } ~InodeWrapper() { UpdateS3ChunkInfoMetric(-s3ChunkInfoSize_ - s3ChunkInfoAddSize_); + g_alive_inode_count << -1; } uint64_t GetInodeId() const { @@ -297,9 +302,7 @@ class InodeWrapper : public std::enable_shared_from_this { CURVEFS_ERROR SyncAttr(bool internal = false); - void FlushAsync(); - - void FlushAttrAsync(); + void AsyncFlushAttr(MetaServerClientDone *done, bool internal); void FlushS3ChunkInfoAsync(); @@ -376,6 +379,10 @@ class InodeWrapper : public std::enable_shared_from_this { return &extentCache_; } + ExtentCache* GetMutableExtentCacheLocked() { + return &extentCache_; + } + CURVEFS_ERROR RefreshVolumeExtent(); bool NeedRefreshData() { @@ -427,8 +434,13 @@ class InodeWrapper : public std::enable_shared_from_this { } } + // Flush inode attributes and extents asynchronously. + // REQUIRES: |mtx_| is held + void AsyncFlushAttrAndExtents(MetaServerClientDone *done, bool internal); + private: friend class UpdateVolumeExtentClosure; + friend class UpdateInodeAttrAndExtentClosure; CURVEFS_ERROR FlushVolumeExtent(); void FlushVolumeExtentAsync(); diff --git a/curvefs/src/client/rpcclient/BUILD b/curvefs/src/client/rpcclient/BUILD index 8cae9d05c3..c0e6927f96 100644 --- a/curvefs/src/client/rpcclient/BUILD +++ b/curvefs/src/client/rpcclient/BUILD @@ -37,5 +37,6 @@ cc_library( "//external:glog", "//src/client:curve_client", "@com_google_absl//absl/cleanup", + "@com_google_absl//absl/types:optional", ], ) diff --git a/curvefs/src/client/rpcclient/metaserver_client.cpp b/curvefs/src/client/rpcclient/metaserver_client.cpp index 8cd19e3d37..0d03e37b0c 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.cpp +++ b/curvefs/src/client/rpcclient/metaserver_client.cpp @@ -832,7 +832,7 @@ UpdateInodeRequest MetaServerClientImpl::BuileUpdateInodeAttrWithOutNlinkRequest( const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd) { + S3ChunkInfoMap *s3ChunkInfoAdd) { UpdateInodeRequest request; request.set_inodeid(inode.inodeid()); request.set_fsid(inode.fsid()); @@ -873,7 +873,7 @@ MetaServerClientImpl::UpdateInodeAttr(const Inode &inode, MetaStatusCode MetaServerClientImpl::UpdateInodeAttrWithOutNlink( const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd, bool internal) { + S3ChunkInfoMap *s3ChunkInfoAdd, bool internal) { UpdateInodeRequest request = BuileUpdateInodeAttrWithOutNlinkRequest( inode, statusChange, s3ChunkInfoAdd); return UpdateInode(request, internal); @@ -960,11 +960,46 @@ void MetaServerClientImpl::UpdateInodeAttrAsync( UpdateInodeAsync(request, done); } +namespace { +UpdateInodeRequest BuildUpdateInodeRequest(const Inode& inode, + DataIndices&& indices, + InodeOpenStatusChange statusChange) { + UpdateInodeRequest request; + request.set_inodeid(inode.inodeid()); + request.set_fsid(inode.fsid()); + request.set_length(inode.length()); + request.set_ctime(inode.ctime()); + request.set_mtime(inode.mtime()); + request.set_atime(inode.atime()); + request.set_uid(inode.uid()); + request.set_gid(inode.gid()); + request.set_mode(inode.mode()); + request.set_inodeopenstatuschange(statusChange); + *(request.mutable_parent()) = inode.parent(); + if (inode.xattr_size() > 0) { + *(request.mutable_xattr()) = inode.xattr(); + } + + if (indices.s3ChunkInfoMap && !indices.s3ChunkInfoMap->empty()) { + *request.mutable_s3chunkinfoadd() = + std::move(indices.s3ChunkInfoMap.value()); + } + + if (indices.volumeExtents && indices.volumeExtents->slices_size() > 0) { + *request.mutable_extents() = std::move(indices.volumeExtents.value()); + } + + return request; +} +} // namespace + void MetaServerClientImpl::UpdateInodeWithOutNlinkAsync( - const Inode &inode, MetaServerClientDone *done, - InodeOpenStatusChange statusChange, S3ChunkInofMap *s3ChunkInfoAdd) { - UpdateInodeRequest request = BuileUpdateInodeAttrWithOutNlinkRequest( - inode, statusChange, s3ChunkInfoAdd); + const Inode &inode, + MetaServerClientDone *done, + InodeOpenStatusChange statusChange, + DataIndices &&indices) { + auto request = + BuildUpdateInodeRequest(inode, std::move(indices), statusChange); UpdateInodeAsync(request, done); } diff --git a/curvefs/src/client/rpcclient/metaserver_client.h b/curvefs/src/client/rpcclient/metaserver_client.h index 53fe9d18fa..8ccf677cae 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.h +++ b/curvefs/src/client/rpcclient/metaserver_client.h @@ -39,6 +39,7 @@ #include "curvefs/src/client/rpcclient/task_excutor.h" #include "curvefs/src/client/metric/client_metric.h" #include "curvefs/src/common/rpc_stream.h" +#include "absl/types/optional.h" using ::curvefs::client::metric::MetaServerClientMetric; using ::curvefs::metaserver::Dentry; @@ -51,7 +52,7 @@ using ::curvefs::metaserver::MetaStatusCode; using ::curvefs::metaserver::S3ChunkInfoList; using ::curvefs::common::StreamStatus; using ::curvefs::common::StreamClient; -using S3ChunkInofMap = google::protobuf::Map; +using S3ChunkInfoMap = google::protobuf::Map; namespace curvefs { namespace client { @@ -60,6 +61,11 @@ namespace rpcclient { using S3ChunkInfoMap = google::protobuf::Map; using ::curvefs::metaserver::VolumeExtentList; +struct DataIndices { + absl::optional s3ChunkInfoMap; + absl::optional volumeExtents; +}; + class MetaServerClient { public: virtual ~MetaServerClient() = default; @@ -115,7 +121,7 @@ class MetaServerClient { virtual MetaStatusCode UpdateInodeAttrWithOutNlink( const Inode &inode, InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE, - S3ChunkInofMap *s3ChunkInfoAdd = nullptr, + S3ChunkInfoMap *s3ChunkInfoAdd = nullptr, bool internal = false) = 0; virtual void UpdateInodeAttrAsync(const Inode &inode, @@ -123,11 +129,11 @@ class MetaServerClient { InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE) = 0; - virtual void UpdateInodeWithOutNlinkAsync(const Inode &inode, - MetaServerClientDone *done, - InodeOpenStatusChange statusChange = - InodeOpenStatusChange::NOCHANGE, - S3ChunkInofMap *s3ChunkInfoAdd = nullptr) = 0; + virtual void UpdateInodeWithOutNlinkAsync( + const Inode& inode, + MetaServerClientDone* done, + InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE, + DataIndices&& indices = {}) = 0; virtual MetaStatusCode GetOrModifyS3ChunkInfo( uint32_t fsId, uint64_t inodeId, @@ -219,18 +225,18 @@ class MetaServerClientImpl : public MetaServerClient { MetaStatusCode UpdateInodeAttrWithOutNlink( const Inode &inode, InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE, - S3ChunkInofMap *s3ChunkInfoAdd = nullptr, + S3ChunkInfoMap *s3ChunkInfoAdd = nullptr, bool internal = false) override; void UpdateInodeAttrAsync(const Inode &inode, MetaServerClientDone *done, InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE) override; - void UpdateInodeWithOutNlinkAsync(const Inode &inode, + void UpdateInodeWithOutNlinkAsync( + const Inode &inode, MetaServerClientDone *done, - InodeOpenStatusChange statusChange = - InodeOpenStatusChange::NOCHANGE, - S3ChunkInofMap *s3ChunkInfoAdd = nullptr) override; + InodeOpenStatusChange statusChange = InodeOpenStatusChange::NOCHANGE, + DataIndices &&indices = {}) override; MetaStatusCode GetOrModifyS3ChunkInfo( uint32_t fsId, uint64_t inodeId, @@ -278,7 +284,7 @@ class MetaServerClientImpl : public MetaServerClient { UpdateInodeRequest BuileUpdateInodeAttrWithOutNlinkRequest( const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd); + S3ChunkInfoMap *s3ChunkInfoAdd); bool ParseS3MetaStreamBuffer(butil::IOBuf* buffer, uint64_t* chunkIndex, diff --git a/curvefs/src/metaserver/inode_manager.cpp b/curvefs/src/metaserver/inode_manager.cpp index 6239be12e0..86bbcbed90 100644 --- a/curvefs/src/metaserver/inode_manager.cpp +++ b/curvefs/src/metaserver/inode_manager.cpp @@ -309,6 +309,21 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request, } } + // update extent in request + if (request.has_extents()) { + const auto fsId = old->fsid(); + const auto inodeId = old->inodeid(); + for (const auto &slice : request.extents().slices()) { + auto rc = UpdateVolumeExtentSliceLocked(fsId, inodeId, slice); + if (rc != MetaStatusCode::OK) { + LOG(ERROR) << "UpdateVolumeExtent failed, err: " + << MetaStatusCode_Name(rc) << ", fsId: " << fsId + << ", inodeId: " << inodeId; + return rc; + } + } + } + VLOG(9) << "UpdateInode success, " << request.ShortDebugString(); return MetaStatusCode::OK; } diff --git a/curvefs/src/volume/bitmap_allocator.cpp b/curvefs/src/volume/bitmap_allocator.cpp index a3350b1480..39732a8d7a 100644 --- a/curvefs/src/volume/bitmap_allocator.cpp +++ b/curvefs/src/volume/bitmap_allocator.cpp @@ -85,7 +85,7 @@ BitmapAllocator::BitmapAllocator(const BitmapAllocatorOption& opt) << ", opt.length: " << opt.length; VLOG(9) << "offset: " << opt_.startOffset << ", len: " << opt_.length - << ", size_per_bit: " << opt_.sizePerBit << "bitmapAreaLength_ " + << ", size_per_bit: " << opt_.sizePerBit << ", bitmapAreaLength_ " << bitmapAreaLength_ << ", bitmapAreaOffset_: " << bitmapAreaOffset_ << ", smallAreaLength_: " << smallAreaLength_ << ", available: " << available_; diff --git a/curvefs/test/client/mock_metaserver_client.h b/curvefs/test/client/mock_metaserver_client.h index 692e0b9e09..d2fdfe39fc 100644 --- a/curvefs/test/client/mock_metaserver_client.h +++ b/curvefs/test/client/mock_metaserver_client.h @@ -31,6 +31,7 @@ #include #include #include +#include #include "curvefs/src/client/rpcclient/metaserver_client.h" @@ -101,17 +102,28 @@ class MockMetaServerClient : public MetaServerClient { MOCK_METHOD4(UpdateInodeAttrWithOutNlink, MetaStatusCode(const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd, + S3ChunkInfoMap *s3ChunkInfoAdd, bool internal)); MOCK_METHOD3(UpdateInodeAttrAsync, void(const Inode &inode, MetaServerClientDone *done, InodeOpenStatusChange statusChange)); - MOCK_METHOD4(UpdateInodeWithOutNlinkAsync, - void(const Inode &inode, MetaServerClientDone *done, + // Workaround for rvalue parameters + // https://stackoverflow.com/questions/12088537/workaround-for-gmock-to-support-rvalue-reference + void UpdateInodeWithOutNlinkAsync(const Inode& inode, + MetaServerClientDone* done, + InodeOpenStatusChange change, + DataIndices&& indices) override { + return UpdateInodeWithOutNlinkAsync_rvr(inode, done, change, + std::move(indices)); + } + + MOCK_METHOD4(UpdateInodeWithOutNlinkAsync_rvr, + void(const Inode& inode, + MetaServerClientDone* done, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd)); + DataIndices)); MOCK_METHOD2(UpdateXattrAsync, void(const Inode &inode, MetaServerClientDone *done)); diff --git a/curvefs/test/client/test_fuse_client.cpp b/curvefs/test/client/test_fuse_client.cpp index daeee3fc93..5aab038d36 100644 --- a/curvefs/test/client/test_fuse_client.cpp +++ b/curvefs/test/client/test_fuse_client.cpp @@ -867,7 +867,7 @@ TEST_F(TestFuseVolumeClient, FuseOpRenameBasic) { .Times(1) .WillRepeatedly( Invoke([&](const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd, bool internal) { + S3ChunkInfoMap *s3ChunkInfoAdd, bool internal) { return MetaStatusCode::OK; })); @@ -1036,7 +1036,7 @@ TEST_F(TestFuseVolumeClient, FuseOpRenameOverwrite) { .Times(3) .WillRepeatedly( Invoke([&](const Inode &inode, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd, bool internal) { + S3ChunkInfoMap *s3ChunkInfoAdd, bool internal) { return MetaStatusCode::OK; })); diff --git a/curvefs/test/client/test_inodeWrapper.cpp b/curvefs/test/client/test_inodeWrapper.cpp index 5ea0358432..5e7f978367 100644 --- a/curvefs/test/client/test_inodeWrapper.cpp +++ b/curvefs/test/client/test_inodeWrapper.cpp @@ -15,7 +15,6 @@ * limitations under the License. */ - /* * Project: curve * Created Date: 2021-12-28 @@ -23,30 +22,36 @@ */ #include -#include #include #include +#include + +#include +#include +#include #include "curvefs/proto/metaserver.pb.h" +#include "curvefs/src/client/inode_wrapper.h" +#include "curvefs/src/client/rpcclient/metaserver_client.h" #include "curvefs/src/client/rpcclient/task_excutor.h" #include "curvefs/src/client/volume/extent.h" #include "curvefs/src/client/volume/extent_cache.h" #include "curvefs/test/client/mock_metaserver_client.h" -#include "curvefs/src/client/inode_wrapper.h" using ::google::protobuf::util::MessageDifferencer; namespace curvefs { namespace client { +using ::curvefs::client::rpcclient::MetaServerClientDone; +using rpcclient::DataIndices; using ::testing::_; using ::testing::Contains; using ::testing::DoAll; +using ::testing::Invoke; using ::testing::Return; using ::testing::SetArgPointee; using ::testing::SetArgReferee; -using ::testing::Invoke; -using ::curvefs::client::rpcclient::MetaServerClientDone; using rpcclient::MockMetaServerClient; @@ -86,7 +91,6 @@ TEST(TestAppendS3ChunkInfoToMap, testAppendS3ChunkInfoToMap) { ASSERT_TRUE(MessageDifferencer::Equals( info1, s3ChunkInfoMap[chunkIndex1].s3chunks(0))); - // add to same chunkIndex S3ChunkInfo info2; info2.set_chunkid(2); @@ -236,5 +240,65 @@ TEST_F(TestInodeWrapper, TestNeedRefreshData) { ASSERT_TRUE(inodeWrapper->NeedRefreshData()); } +namespace { + +struct FakeCallback : public MetaServerClientDone { + void Run() override { + { + std::lock_guard lock(mtx); + runned = true; + } + cond.notify_one(); + } + + void Wait() { + std::unique_lock lock(mtx); + cond.wait(lock, [this]() { return runned; }); + } + + std::mutex mtx; + std::condition_variable cond; + bool runned{false}; +}; + +struct FakeUpdateInodeWithOutNlinkAsync { + void operator()(const Inode& inode, + MetaServerClientDone* done, + InodeOpenStatusChange statusChange, + DataIndices indices) const { + std::thread th{[done]() { + std::this_thread::sleep_for(std::chrono::seconds(1)); + done->SetMetaStatusCode(MetaStatusCode::OK); + done->Run(); + }}; + + th.detach(); + } +}; + +} // namespace + +TEST_F(TestInodeWrapper, TestAsyncInode) { + for (auto type : {FsFileType::TYPE_DIRECTORY, FsFileType::TYPE_FILE, + FsFileType::TYPE_S3, FsFileType::TYPE_SYM_LINK}) { + for (auto dirty : {true, false}) { + inodeWrapper_->SetType(type); + if (!dirty) { + inodeWrapper_->ClearDirty(); + } + + EXPECT_CALL(*metaClient_, + UpdateInodeWithOutNlinkAsync_rvr(_, _, _, _)) + .Times(dirty ? 1 : 0) + .WillRepeatedly(Invoke(FakeUpdateInodeWithOutNlinkAsync{})); + + FakeCallback done; + inodeWrapper_->Async(&done); + done.Wait(); + ASSERT_EQ(MetaStatusCode::OK, done.GetStatusCode()); + } + } +} + } // namespace client } // namespace curvefs diff --git a/curvefs/test/client/test_inode_cache_manager.cpp b/curvefs/test/client/test_inode_cache_manager.cpp index e3e6f5d60a..11785fb3c7 100644 --- a/curvefs/test/client/test_inode_cache_manager.cpp +++ b/curvefs/test/client/test_inode_cache_manager.cpp @@ -22,8 +22,11 @@ #include #include +#include #include +#include +#include "curvefs/src/client/rpcclient/metaserver_client.h" #include "curvefs/test/client/mock_metaserver_client.h" #include "curvefs/src/client/inode_cache_manager.h" #include "curvefs/src/common/define.h" @@ -50,6 +53,7 @@ using ::testing::AnyOf; using rpcclient::MetaServerClientDone; using rpcclient::MockMetaServerClient; +using rpcclient::DataIndices; class TestInodeCacheManager : public ::testing::Test { protected: @@ -338,10 +342,10 @@ TEST_F(TestInodeCacheManager, ShipToFlushAndFlushAll) { iCacheManager_->ShipToFlush(inodeWrapper); - EXPECT_CALL(*metaClient_, UpdateInodeWithOutNlinkAsync(_, _, _, _)) + EXPECT_CALL(*metaClient_, UpdateInodeWithOutNlinkAsync_rvr(_, _, _, _)) .WillOnce(Invoke([](const Inode &inode, MetaServerClientDone *done, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd) { + DataIndices /*indices*/) { done->SetMetaStatusCode(MetaStatusCode::OK); done->Run(); })); @@ -451,13 +455,19 @@ TEST_F(TestInodeCacheManager, TestFlushInodeBackground) { inodeMap.emplace(inodeId + i, inodeWrapper); } - EXPECT_CALL(*metaClient_, UpdateInodeWithOutNlinkAsync(_, _, _, _)) + EXPECT_CALL(*metaClient_, UpdateInodeWithOutNlinkAsync_rvr(_, _, _, _)) .WillRepeatedly( - Invoke([](const Inode &inode, MetaServerClientDone *done, + Invoke([](const Inode& inode, MetaServerClientDone* done, InodeOpenStatusChange statusChange, - S3ChunkInofMap *s3ChunkInfoAdd) { - done->SetMetaStatusCode(MetaStatusCode::OK); - done->Run(); + DataIndices /*dataIndices*/) { + // run closure in a separate thread + std::thread th{[done]() { + std::this_thread::sleep_for(std::chrono::microseconds(200)); + done->SetMetaStatusCode(MetaStatusCode::OK); + done->Run(); + }}; + + th.detach(); })); EXPECT_CALL(*metaClient_, GetOrModifyS3ChunkInfoAsync(_, _, _, _))