Skip to content

Commit

Permalink
mds: check file in-use when Delete/Rename/ChangeOwner
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hanqing committed Nov 4, 2020
1 parent 4b6dca0 commit 2444355
Show file tree
Hide file tree
Showing 17 changed files with 341 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/client/iomanager4file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ void IOManager4File::LeaseTimeoutBlockIO() {
}
}

void IOManager4File::RefeshSuccAndResumeIO() {
void IOManager4File::ResumeIO() {
std::unique_lock<std::mutex> lk(exitMtx_);
if (exit_ == false) {
scheduler_->RefeshSuccAndResumeIO();
scheduler_->ResumeIO();
} else {
LOG(WARNING) << "io manager already exit, no need resume io!";
}
Expand Down
10 changes: 9 additions & 1 deletion src/client/iomanager4file.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class IOManager4File : public IOManager {
mc_.SetLatestFileSn(newSn);
}

/**
* @brief get current file inodeid
* @return file inodeid
*/
uint64_t InodeId() const {
return mc_.InodeId();
}

private:
friend class LeaseExecutor;
friend class FlightIOGuard;
Expand All @@ -187,7 +195,7 @@ class IOManager4File : public IOManager {
/**
* 当lease又续约成功的时候,LeaseExecutor调用该接口恢复IO
*/
void RefeshSuccAndResumeIO();
void ResumeIO();

/**
* 当lesaeexcutor发现版本变更,调用该接口开始等待inflight回来,这段期间IO是hang的
Expand Down
18 changes: 14 additions & 4 deletions src/client/lease_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,20 @@ bool LeaseExecutor::RefreshLease() {
}

if (response.status == LeaseRefreshResult::Status::OK) {
if (CURVE_UNLIKELY(iomanager_->InodeId() != response.finfo.id)) {
LOG(ERROR) << fullFileName_ << " inode id changed, current id = "
<< iomanager_->InodeId()
<< ", but mds response id = " << response.finfo.id
<< ", block IO";
iomanager_->LeaseTimeoutBlockIO();
isleaseAvaliable_.store(false);
return false;
}

CheckNeedUpdateVersion(response.finfo.seqnum);
failedrefreshcount_.store(0);
isleaseAvaliable_.store(true);
iomanager_->RefeshSuccAndResumeIO();
iomanager_->ResumeIO();
return true;
} else if (response.status == LeaseRefreshResult::Status::NOT_EXIST) {
iomanager_->LeaseTimeoutBlockIO();
Expand Down Expand Up @@ -149,10 +159,10 @@ void LeaseExecutor::IncremRefreshFailed() {
void LeaseExecutor::CheckNeedUpdateVersion(uint64_t newversion) {
const uint64_t currentFileSn = iomanager_->GetLatestFileSn();

DVLOG(9) << "new file version = " << newversion
<< ", current version = " << currentFileSn
<< ", filename = " << fullFileName_;
if (newversion > currentFileSn) {
LOG(INFO) << fullFileName_
<< " version changed, old version = " << currentFileSn
<< ", new version = " << newversion;
iomanager_->SetLatestFileSn(newversion);
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/client/lease_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ class LeaseExecutor {
* @param: mdsclient是与mds续约的client
* @param: iomanager会在续约失败或者版本变更的时候进行io调度
*/
LeaseExecutor(const LeaseOption& leaseOpt,
UserInfo_t userinfo,
MDSClient* mdscllent,
IOManager4File* iomanager);
LeaseExecutor(const LeaseOption& leaseOpt, UserInfo_t userinfo,
MDSClient* mdscllent, IOManager4File* iomanager);

~LeaseExecutor();

Expand Down
9 changes: 9 additions & 0 deletions src/client/libcurve_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ int FileClient::Read(int fd, char* buf, off_t offset, size_t len) {
}

if (CheckAligned(offset, len) == false) {
LOG(ERROR) << "Read request not aligned, length = " << len
<< ", offset = " << offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -304,6 +306,8 @@ int FileClient::Write(int fd, const char* buf, off_t offset, size_t len) {
}

if (CheckAligned(offset, len) == false) {
LOG(ERROR) << "Write request not aligned, length = " << len
<< ", offset = " << offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -324,6 +328,8 @@ int FileClient::AioRead(int fd, CurveAioContext* aioctx,
}

if (CheckAligned(aioctx->offset, aioctx->length) == false) {
LOG(ERROR) << "AioRead request not aligned, length = " << aioctx->length
<< ", offset = " << aioctx->offset << ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand All @@ -347,6 +353,9 @@ int FileClient::AioWrite(int fd, CurveAioContext* aioctx,
}

if (CheckAligned(aioctx->offset, aioctx->length) == false) {
LOG(ERROR) << "AioWrite request not aligned, length = "
<< aioctx->length << ", offset = " << aioctx->offset
<< ", fd = " << fd;
return -LIBCURVE_ERROR::NOT_ALIGNED;
}

Expand Down
4 changes: 4 additions & 0 deletions src/client/metacache.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ class MetaCache {
return unstableHelper_;
}

uint64_t InodeId() const {
return fileInfo_.id;
}

private:
/**
* @brief 从mds更新copyset复制组信息
Expand Down
2 changes: 1 addition & 1 deletion src/client/request_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class RequestScheduler : public Uncopyable {
* 当lease又续约成功的时候,LeaseExecutor调用该接口恢复IO,
* IO调度被恢复
*/
void RefeshSuccAndResumeIO() {
void ResumeIO() {
std::unique_lock<std::mutex> lk(leaseRefreshmtx_);
blockIO_.store(false);
leaseRefreshcv_.notify_all();
Expand Down
28 changes: 18 additions & 10 deletions src/mds/nameserver2/curvefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
#include "src/mds/common/mds_define.h"

using curve::common::TimeUtility;
using ::std::chrono::steady_clock;
using ::std::chrono::microseconds;
using curve::mds::topology::LogicalPool;

namespace curve {
Expand Down Expand Up @@ -90,7 +88,7 @@ bool CurveFS::Init(std::shared_ptr<NameServerStorage> storage,
std::shared_ptr<AllocStatistic> allocStatistic,
const struct CurveFSOption &curveFSOptions,
std::shared_ptr<Topology> topology) {
startTime_ = steady_clock::now();
startTime_ = std::chrono::steady_clock::now();
storage_ = storage;
InodeIDGenerator_ = InodeIDGenerator;
chunkSegAllocator_ = chunkSegAllocator;
Expand Down Expand Up @@ -480,13 +478,8 @@ StatusCode CurveFS::isDirectoryEmpty(const FileInfo &fileInfo, bool *result) {
}

StatusCode CurveFS::IsSnapshotAllowed(const std::string &fileName) {
// whether the startup time is sufficient for the client to perform
// at least one refresh session
steady_clock::duration timePass = steady_clock::now() - startTime_;
int32_t expiredUs = fileRecordManager_->GetFileRecordExpiredTimeUs();
if (timePass < 10 * microseconds(expiredUs)) {
LOG(INFO) << "snapshot is not allowed now, fileName = " << fileName
<< ", time pass = " << timePass.count();
if (!IsStartEnoughTime(10)) {
LOG(INFO) << "snapshot is not allowed now, fileName = " << fileName;
return StatusCode::kSnapshotFrozen;
}

Expand Down Expand Up @@ -698,6 +691,18 @@ StatusCode CurveFS::CheckFileCanChange(const std::string &fileName,
return StatusCode::kDeleteFileBeingCloned;
}

if (!IsStartEnoughTime(1)) {
LOG(WARNING) << "MDS doesn't start enough time";
return StatusCode::kNotSupported;
}

ClientIpPortType mountPoint;
if (fileRecordManager_->FindFileMountPoint(fileName, &mountPoint)) {
LOG(ERROR) << fileName << "is mounting on " << mountPoint.first << ":"
<< mountPoint.second;
return StatusCode::kFileOccupied;
}

return StatusCode::kOK;
}

Expand Down Expand Up @@ -1362,6 +1367,9 @@ StatusCode CurveFS::CloseFile(const std::string &fileName,
return ret;
}

// remove file record
fileRecordManager_->RemoveFileRecord(fileName);

return StatusCode::kOK;
}

Expand Down
13 changes: 13 additions & 0 deletions src/mds/nameserver2/curvefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,19 @@ class CurveFS {
const FileInfo& fileInfo,
uint64_t* fileSize);

/**
* @brief check whether mds has started for enough time, based on the
* file record expiration time(mds.file.expiredTimeUs)
* @param times multiple of file record expiration time
* @return return true if ok, otherwise return false
*/
bool IsStartEnoughTime(int times) const {
std::chrono::steady_clock::duration timePass =
std::chrono::steady_clock::now() - startTime_;
uint32_t expiredUs = fileRecordManager_->GetFileRecordExpiredTimeUs();
return timePass >= times * std::chrono::microseconds(expiredUs);
}

private:
FileInfo rootFileInfo_;
std::shared_ptr<NameServerStorage> storage_;
Expand Down
4 changes: 4 additions & 0 deletions src/mds/nameserver2/file_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void FileRecordManager::UpdateFileRecord(const std::string& fileName,
fileRecords_.emplace(fileName, record);
}

void FileRecordManager::RemoveFileRecord(const std::string& filename) {
WriteLockGuard lk(rwlock_);
fileRecords_.erase(filename);
}

void FileRecordManager::Scan() {
while (sleeper_.wait_for(
Expand Down
14 changes: 11 additions & 3 deletions src/mds/nameserver2/file_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class FileRecord {

class FileRecordManager {
public:
virtual ~FileRecordManager() = default;

/**
* @brief initialization
* @param[in] sessionOption session configuration
Expand All @@ -160,7 +162,7 @@ class FileRecordManager {
* @brief Get the expired time of the file
* @return the expired time
*/
uint32_t GetFileRecordExpiredTimeUs() const {
virtual uint32_t GetFileRecordExpiredTimeUs() const {
return fileRecordOptions_.fileRecordExpiredTimeUs;
}

Expand All @@ -173,6 +175,12 @@ class FileRecordManager {
const std::string& clientIP,
uint32_t clientPort);

/**
* @brief remove file record corresponding to filename
* @param filename file record that to be deleted
*/
void RemoveFileRecord(const std::string& filename);

/**
* @brief Get the client version corresponding to the input file
* @param[in] filename
Expand All @@ -198,8 +206,8 @@ class FileRecordManager {

std::set<ClientIpPortType> ListAllClient() const;

bool FindFileMountPoint(const std::string& fileName,
ClientIpPortType* ipPort) const;
virtual bool FindFileMountPoint(const std::string& fileName,
ClientIpPortType* ipPort) const;

private:
/**
Expand Down
4 changes: 3 additions & 1 deletion src/snapshotcloneserver/common/curvefs_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,9 @@ int CurveFsClientImpl::RenameCloneFile(
};
RetryCondition condition = [] (int ret) {
return ret != LIBCURVE_ERROR::OK &&
ret != -LIBCURVE_ERROR::NOTEXIST;
ret != -LIBCURVE_ERROR::NOTEXIST &&
ret != -LIBCURVE_ERROR::NOT_SUPPORT &&
ret != -LIBCURVE_ERROR::FILE_OCCUPIED; // file is in-use
};
RetryHelper retryHelper(method, condition);
return retryHelper.RetryTimeSecAndReturn(clientMethodRetryTimeSec_,
Expand Down
54 changes: 52 additions & 2 deletions test/client/client_session_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ TEST(ClientSession, LeaseTaskTest) {
se->set_sessionstatus(::curve::mds::SessionStatus::kSessionOK);

finfo->set_filename(filename);
finfo->set_id(1);
openresponse.set_statuscode(::curve::mds::StatusCode::kOK);
openresponse.set_allocated_protosession(se);
openresponse.set_allocated_fileinfo(finfo);
Expand Down Expand Up @@ -233,8 +234,57 @@ TEST(ClientSession, LeaseTaskTest) {
}
ASSERT_TRUE(lease->LeaseValid());

// 9. set refresh success
{
std::unique_lock<std::mutex> lk(sessionMtx);
sessionCV.wait(lk, [&]() { return sessionFlag; });
}

// 9. set inode id changed
refreshresp.set_allocated_fileinfo(nullptr); // clear existing file info

curve::mds::FileInfo* newFileInfo = new curve::mds::FileInfo;
newFileInfo->set_filename(filename);
newFileInfo->set_seqnum(2);
newFileInfo->set_id(100);
newFileInfo->set_parentid(0);
newFileInfo->set_filetype(curve::mds::FileType::INODE_PAGEFILE);
newFileInfo->set_chunksize(4 * 1024 * 1024);
newFileInfo->set_length(4 * 1024 * 1024 * 1024ul);
newFileInfo->set_ctime(12345678);

refreshresp.set_allocated_fileinfo(newFileInfo);
refreshresp.set_statuscode(::curve::mds::StatusCode::kOK);

FakeReturn* refreshFakeRetWithNewInodeId = new FakeReturn(
nullptr, static_cast<void*>(&refreshresp));
curvefsservice->SetRefreshSession(
refreshFakeRetWithNewInodeId, refresht);

{
std::unique_lock<std::mutex> lk(mtx);
refreshcv.wait(lk);
}

std::this_thread::sleep_for(std::chrono::seconds(1));
lease = fileinstance.GetLeaseExecutor();
ASSERT_FALSE(lease->LeaseValid());

// 10. set refresh success
refreshresp.set_allocated_fileinfo(nullptr); // clear existing file info

newFileInfo = new curve::mds::FileInfo;
newFileInfo->set_filename(filename);
newFileInfo->set_seqnum(2);
newFileInfo->set_id(1);
newFileInfo->set_parentid(0);
newFileInfo->set_filetype(curve::mds::FileType::INODE_PAGEFILE);
newFileInfo->set_chunksize(4 * 1024 * 1024);
newFileInfo->set_length(4 * 1024 * 1024 * 1024ul);
newFileInfo->set_ctime(12345678);

refreshresp.set_allocated_fileinfo(newFileInfo);
refreshresp.set_statuscode(::curve::mds::StatusCode::kOK);

FakeReturn* refreshfakeretOK4 =
new FakeReturn(nullptr, static_cast<void*>(&refreshresp));
curvefsservice->SetRefreshSession(refreshfakeretOK4, refresht);
Expand All @@ -251,7 +301,7 @@ TEST(ClientSession, LeaseTaskTest) {
std::unique_lock<std::mutex> lk(sessionMtx);
sessionCV.wait(lk, [&]() { return sessionFlag; });

// 10. set fake close return
// 11. set fake close return
::curve::mds::CloseFileResponse closeresp;
closeresp.set_statuscode(::curve::mds::StatusCode::kOK);
FakeReturn* closefileret
Expand Down
5 changes: 5 additions & 0 deletions test/client/fake/fakeMDS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ bool FakeMDS::StartService() {
::curve::mds::ReFreshSessionResponse* refreshresp = new ::curve::mds::ReFreshSessionResponse(); // NOLINT
refreshresp->set_statuscode(::curve::mds::StatusCode::kOK);
refreshresp->set_sessionid("1234");

auto* refreshFileInfo = new ::curve::mds::FileInfo();
refreshFileInfo->set_id(1);
refreshresp->set_allocated_fileinfo(refreshFileInfo);

FakeReturn* refreshfakeret = new FakeReturn(nullptr, static_cast<void*>(refreshresp)); // NOLINT
fakecurvefsservice_.SetRefreshSession(refreshfakeret, nullptr);

Expand Down
2 changes: 1 addition & 1 deletion test/client/fake/fakeMDS.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class FakeMDSCurveFSService : public curve::mds::CurveFSService {
curve::mds::FileInfo * info = new curve::mds::FileInfo;
info->set_seqnum(seq++);
info->set_filename("_filename_");
info->set_id(1);
info->set_id(resp->fileinfo().id());
info->set_parentid(0);
info->set_filetype(curve::mds::FileType::INODE_PAGEFILE);
info->set_chunksize(4 * 1024 * 1024);
Expand Down
Loading

0 comments on commit 2444355

Please sign in to comment.