Skip to content

Commit

Permalink
mds: check file in-use when Delete/Rename/ChangeOwner
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hanqing committed Nov 19, 2020
1 parent 4b6dca0 commit 2f7a915
Show file tree
Hide file tree
Showing 21 changed files with 362 additions and 42 deletions.
12 changes: 6 additions & 6 deletions src/client/file_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ void FileInstance::UnInitialize() {
// 因为如果后台集群重新部署了,需要通过lease续约来获取当前session状态
// 这样在session过期后才能将inflight RPC正确回收掉。
iomanager4file_.UnInitialize();
if (leaseExecutor_ != nullptr) {
leaseExecutor_->Stop();
leaseExecutor_.reset();
}
}

int FileInstance::Read(char* buf, off_t offset, size_t length) {
Expand Down Expand Up @@ -173,9 +169,13 @@ int FileInstance::Close() {
return 0;
}

if (leaseExecutor_ != nullptr) {
leaseExecutor_->Stop();
leaseExecutor_.reset();
}

LIBCURVE_ERROR ret =
mdsclient_->CloseFile(finfo_.fullPathName, finfo_.userinfo,
leaseExecutor_->GetLeaseSessionID());
mdsclient_->CloseFile(finfo_.fullPathName, finfo_.userinfo, "");
return -ret;
}

Expand Down
4 changes: 2 additions & 2 deletions src/client/iomanager4file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ void IOManager4File::LeaseTimeoutBlockIO() {
}
}

void IOManager4File::RefeshSuccAndResumeIO() {
void IOManager4File::ResumeIO() {
std::unique_lock<std::mutex> lk(exitMtx_);
if (exit_ == false) {
scheduler_->RefeshSuccAndResumeIO();
scheduler_->ResumeIO();
} else {
LOG(WARNING) << "io manager already exit, no need resume io!";
}
Expand Down
10 changes: 9 additions & 1 deletion src/client/iomanager4file.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class IOManager4File : public IOManager {
mc_.SetLatestFileSn(newSn);
}

/**
* @brief get current file inodeid
* @return file inodeid
*/
uint64_t InodeId() const {
return mc_.InodeId();
}

private:
friend class LeaseExecutor;
friend class FlightIOGuard;
Expand All @@ -187,7 +195,7 @@ class IOManager4File : public IOManager {
/**
* 当lease又续约成功的时候,LeaseExecutor调用该接口恢复IO
*/
void RefeshSuccAndResumeIO();
void ResumeIO();

/**
* 当lesaeexcutor发现版本变更,调用该接口开始等待inflight回来,这段期间IO是hang的
Expand Down
20 changes: 15 additions & 5 deletions src/client/lease_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ LeaseExecutor::~LeaseExecutor() {
}
}

bool LeaseExecutor::Start(const FInfo_t& fi, const LeaseSession_t& lease) {
bool LeaseExecutor::Start(const FInfo_t& fi, const LeaseSession_t& lease) {
fullFileName_ = fi.fullPathName;

leasesession_ = lease;
Expand Down Expand Up @@ -104,10 +104,20 @@ bool LeaseExecutor::RefreshLease() {
}

if (response.status == LeaseRefreshResult::Status::OK) {
if (CURVE_UNLIKELY(iomanager_->InodeId() != response.finfo.id)) {
LOG(ERROR) << fullFileName_ << " inode id changed, current id = "
<< iomanager_->InodeId()
<< ", but mds response id = " << response.finfo.id
<< ", block IO";
iomanager_->LeaseTimeoutBlockIO();
isleaseAvaliable_.store(false);
return false;
}

CheckNeedUpdateVersion(response.finfo.seqnum);
failedrefreshcount_.store(0);
isleaseAvaliable_.store(true);
iomanager_->RefeshSuccAndResumeIO();
iomanager_->ResumeIO();
return true;
} else if (response.status == LeaseRefreshResult::Status::NOT_EXIST) {
iomanager_->LeaseTimeoutBlockIO();
Expand Down Expand Up @@ -149,10 +159,10 @@ void LeaseExecutor::IncremRefreshFailed() {
void LeaseExecutor::CheckNeedUpdateVersion(uint64_t newversion) {
const uint64_t currentFileSn = iomanager_->GetLatestFileSn();

DVLOG(9) << "new file version = " << newversion
<< ", current version = " << currentFileSn
<< ", filename = " << fullFileName_;
if (newversion > currentFileSn) {
LOG(INFO) << fullFileName_
<< " version changed, old version = " << currentFileSn
<< ", new version = " << newversion;
iomanager_->SetLatestFileSn(newversion);
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/client/lease_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ class LeaseExecutor {
* @param: mdsclient是与mds续约的client
* @param: iomanager会在续约失败或者版本变更的时候进行io调度
*/
LeaseExecutor(const LeaseOption& leaseOpt,
UserInfo_t userinfo,
MDSClient* mdscllent,
IOManager4File* iomanager);
LeaseExecutor(const LeaseOption& leaseOpt, UserInfo_t userinfo,
MDSClient* mdscllent, IOManager4File* iomanager);

~LeaseExecutor();

Expand Down
10 changes: 10 additions & 0 deletions src/client/libcurve_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ int FileClient::Open(const std::string& filename,
fileserviceMap_[fd] = fileserv;
}

LOG(INFO) << "Open success, filname = " << filename << ", fd = " << fd;
openedFileNum_ << 1;

return fd;
Expand Down Expand Up @@ -285,6 +286,8 @@ int FileClient::Read(int fd, char* buf, off_t offset, size_t len) {
}

if (CheckAligned(offset, len) == false) {
LOG(ERROR) << "Read request not aligned, length = " << len
<< ", offset = " << offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -304,6 +307,8 @@ int FileClient::Write(int fd, const char* buf, off_t offset, size_t len) {
}

if (CheckAligned(offset, len) == false) {
LOG(ERROR) << "Write request not aligned, length = " << len
<< ", offset = " << offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -324,6 +329,8 @@ int FileClient::AioRead(int fd, CurveAioContext* aioctx,
}

if (CheckAligned(aioctx->offset, aioctx->length) == false) {
LOG(ERROR) << "AioRead request not aligned, length = " << aioctx->length
<< ", offset = " << aioctx->offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -347,6 +354,9 @@ int FileClient::AioWrite(int fd, CurveAioContext* aioctx,
}

if (CheckAligned(aioctx->offset, aioctx->length) == false) {
LOG(ERROR) << "AioWrite request not aligned, length = "
<< aioctx->length << ", offset = " << aioctx->offset
<< ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand Down
4 changes: 4 additions & 0 deletions src/client/metacache.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ class MetaCache {
return unstableHelper_;
}

uint64_t InodeId() const {
return fileInfo_.id;
}

private:
/**
* @brief 从mds更新copyset复制组信息
Expand Down
2 changes: 1 addition & 1 deletion src/client/request_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class RequestScheduler : public Uncopyable {
* 当lease又续约成功的时候,LeaseExecutor调用该接口恢复IO,
* IO调度被恢复
*/
void RefeshSuccAndResumeIO() {
void ResumeIO() {
std::unique_lock<std::mutex> lk(leaseRefreshmtx_);
blockIO_.store(false);
leaseRefreshcv_.notify_all();
Expand Down
28 changes: 18 additions & 10 deletions src/mds/nameserver2/curvefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
#include "src/mds/common/mds_define.h"

using curve::common::TimeUtility;
using ::std::chrono::steady_clock;
using ::std::chrono::microseconds;
using curve::mds::topology::LogicalPool;

namespace curve {
Expand Down Expand Up @@ -90,7 +88,7 @@ bool CurveFS::Init(std::shared_ptr<NameServerStorage> storage,
std::shared_ptr<AllocStatistic> allocStatistic,
const struct CurveFSOption &curveFSOptions,
std::shared_ptr<Topology> topology) {
startTime_ = steady_clock::now();
startTime_ = std::chrono::steady_clock::now();
storage_ = storage;
InodeIDGenerator_ = InodeIDGenerator;
chunkSegAllocator_ = chunkSegAllocator;
Expand Down Expand Up @@ -480,13 +478,8 @@ StatusCode CurveFS::isDirectoryEmpty(const FileInfo &fileInfo, bool *result) {
}

StatusCode CurveFS::IsSnapshotAllowed(const std::string &fileName) {
// whether the startup time is sufficient for the client to perform
// at least one refresh session
steady_clock::duration timePass = steady_clock::now() - startTime_;
int32_t expiredUs = fileRecordManager_->GetFileRecordExpiredTimeUs();
if (timePass < 10 * microseconds(expiredUs)) {
LOG(INFO) << "snapshot is not allowed now, fileName = " << fileName
<< ", time pass = " << timePass.count();
if (!IsStartEnoughTime(10)) {
LOG(INFO) << "snapshot is not allowed now, fileName = " << fileName;
return StatusCode::kSnapshotFrozen;
}

Expand Down Expand Up @@ -698,6 +691,18 @@ StatusCode CurveFS::CheckFileCanChange(const std::string &fileName,
return StatusCode::kDeleteFileBeingCloned;
}

if (!IsStartEnoughTime(1)) {
LOG(WARNING) << "MDS doesn't start enough time";
return StatusCode::kNotSupported;
}

ClientIpPortType mountPoint;
if (fileRecordManager_->FindFileMountPoint(fileName, &mountPoint)) {
LOG(WARNING) << fileName << " is mounting on " << mountPoint.first
<< ":" << mountPoint.second;
return StatusCode::kFileOccupied;
}

return StatusCode::kOK;
}

Expand Down Expand Up @@ -1362,6 +1367,9 @@ StatusCode CurveFS::CloseFile(const std::string &fileName,
return ret;
}

// remove file record
fileRecordManager_->RemoveFileRecord(fileName);

return StatusCode::kOK;
}

Expand Down
13 changes: 13 additions & 0 deletions src/mds/nameserver2/curvefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,19 @@ class CurveFS {
const FileInfo& fileInfo,
uint64_t* fileSize);

/**
* @brief check whether mds has started for enough time, based on the
* file record expiration time(mds.file.expiredTimeUs)
* @param times multiple of file record expiration time
* @return return true if ok, otherwise return false
*/
bool IsStartEnoughTime(int times) const {
std::chrono::steady_clock::duration timePass =
std::chrono::steady_clock::now() - startTime_;
uint32_t expiredUs = fileRecordManager_->GetFileRecordExpiredTimeUs();
return timePass >= times * std::chrono::microseconds(expiredUs);
}

private:
FileInfo rootFileInfo_;
std::shared_ptr<NameServerStorage> storage_;
Expand Down
4 changes: 4 additions & 0 deletions src/mds/nameserver2/file_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void FileRecordManager::UpdateFileRecord(const std::string& fileName,
fileRecords_.emplace(fileName, record);
}

void FileRecordManager::RemoveFileRecord(const std::string& filename) {
WriteLockGuard lk(rwlock_);
fileRecords_.erase(filename);
}

void FileRecordManager::Scan() {
while (sleeper_.wait_for(
Expand Down
14 changes: 11 additions & 3 deletions src/mds/nameserver2/file_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class FileRecord {

class FileRecordManager {
public:
virtual ~FileRecordManager() = default;

/**
* @brief initialization
* @param[in] sessionOption session configuration
Expand All @@ -160,7 +162,7 @@ class FileRecordManager {
* @brief Get the expired time of the file
* @return the expired time
*/
uint32_t GetFileRecordExpiredTimeUs() const {
virtual uint32_t GetFileRecordExpiredTimeUs() const {
return fileRecordOptions_.fileRecordExpiredTimeUs;
}

Expand All @@ -173,6 +175,12 @@ class FileRecordManager {
const std::string& clientIP,
uint32_t clientPort);

/**
* @brief remove file record corresponding to filename
* @param filename file record that to be deleted
*/
void RemoveFileRecord(const std::string& filename);

/**
* @brief Get the client version corresponding to the input file
* @param[in] filename
Expand All @@ -198,8 +206,8 @@ class FileRecordManager {

std::set<ClientIpPortType> ListAllClient() const;

bool FindFileMountPoint(const std::string& fileName,
ClientIpPortType* ipPort) const;
virtual bool FindFileMountPoint(const std::string& fileName,
ClientIpPortType* ipPort) const;

private:
/**
Expand Down
4 changes: 3 additions & 1 deletion src/snapshotcloneserver/common/curvefs_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,9 @@ int CurveFsClientImpl::RenameCloneFile(
};
RetryCondition condition = [] (int ret) {
return ret != LIBCURVE_ERROR::OK &&
ret != -LIBCURVE_ERROR::NOTEXIST;
ret != -LIBCURVE_ERROR::NOTEXIST &&
ret != -LIBCURVE_ERROR::NOT_SUPPORT &&
ret != -LIBCURVE_ERROR::FILE_OCCUPIED; // file is in-use
};
RetryHelper retryHelper(method, condition);
return retryHelper.RetryTimeSecAndReturn(clientMethodRetryTimeSec_,
Expand Down
Loading

0 comments on commit 2f7a915

Please sign in to comment.