Skip to content

Commit

Permalink
curvefs/metaserver: recover s3ChunkInfoRemove field for GetOrModifyS3…
Browse files Browse the repository at this point in the history
…ChunkInfo PRC request

and guarantee consistent of s3chunkinfo in inode. (opencurve#1304) (opencurve#1308)
  • Loading branch information
Wine93 authored and fansehep committed May 25, 2022
1 parent de7a0ca commit ba9fa81
Show file tree
Hide file tree
Showing 15 changed files with 538 additions and 229 deletions.
76 changes: 56 additions & 20 deletions curvefs/src/metaserver/inode_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <glog/logging.h>
#include <google/protobuf/util/message_differencer.h>
#include <list>
#include <unordered_set>

#include "curvefs/src/common/define.h"
#include "src/common/timeutility.h"
Expand Down Expand Up @@ -130,21 +131,27 @@ void InodeManager::GenerateInodeInternal(uint64_t inodeId,
return;
}

MetaStatusCode InodeManager::GetInode(uint32_t fsId, uint64_t inodeId,
Inode *inode) {
MetaStatusCode InodeManager::GetInode(uint32_t fsId,
uint64_t inodeId,
Inode *inode,
bool paddingS3ChunkInfo) {
VLOG(1) << "GetInode, fsId = " << fsId << ", inodeId = " << inodeId;
NameLockGuard lg(inodeLock_, GetInodeLockName(fsId, inodeId));
MetaStatusCode ret = inodeStorage_->Get(Key4Inode(fsId, inodeId), inode);
if (ret != MetaStatusCode::OK) {
MetaStatusCode rc = inodeStorage_->Get(Key4Inode(fsId, inodeId), inode);
if (rc == MetaStatusCode::OK && paddingS3ChunkInfo) {
rc = PaddingInodeS3ChunkInfo(fsId, inodeId,
inode->mutable_s3chunkinfomap());
}

if (rc != MetaStatusCode::OK) {
LOG(ERROR) << "GetInode fail, fsId = " << fsId
<< ", inodeId = " << inodeId
<< ", ret = " << MetaStatusCode_Name(ret);
return ret;
<< ", retCode = " << MetaStatusCode_Name(rc);
return rc;
}

VLOG(1) << "GetInode success, fsId = " << fsId << ", inodeId = " << inodeId
<< ", " << inode->ShortDebugString();

return MetaStatusCode::OK;
}

Expand Down Expand Up @@ -308,30 +315,59 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest &request) {
MetaStatusCode InodeManager::GetOrModifyS3ChunkInfo(
uint32_t fsId, uint64_t inodeId,
const S3ChunkInfoMap& map2add,
std::shared_ptr<Iterator>* iterator,
const S3ChunkInfoMap& map2del,
bool returnS3ChunkInfoMap,
bool compaction) {
std::shared_ptr<Iterator>* iterator4InodeS3Meta) {
VLOG(1) << "GetOrModifyS3ChunkInfo, fsId: " << fsId
<< ", inodeId: " << inodeId;

NameLockGuard lg(inodeLock_, GetInodeLockName(fsId, inodeId));

if (!map2add.empty()) {
for (const auto& item : map2add) {
uint64_t chunkIndex = item.first;
auto list2add = item.second;
MetaStatusCode rc = inodeStorage_->AppendS3ChunkInfoList(
fsId, inodeId, chunkIndex, list2add, compaction);
if (rc != MetaStatusCode::OK) {
return rc;
}
const S3ChunkInfoList* list2add;
const S3ChunkInfoList* list2del;
std::unordered_set<uint64_t> deleted;
for (const auto& item : map2add) {
uint64_t chunkIndex = item.first;
list2add = &item.second;
auto iter = map2del.find(chunkIndex);
if (iter != map2del.end()) {
list2del = &iter->second;
} else {
list2del = nullptr;
}

MetaStatusCode rc = inodeStorage_->ModifyInodeS3ChunkInfoList(
fsId, inodeId, chunkIndex, list2add, list2del);
if (rc != MetaStatusCode::OK) {
LOG(ERROR) << "Modify inode s3chunkinfo list failed, fsId=" << fsId
<< ", inodeId=" << inodeId << ", retCode=" << rc;
return rc;
}
deleted.insert(chunkIndex);
}

for (const auto& item : map2del) {
uint64_t chunkIndex = item.first;
if (deleted.find(chunkIndex) != deleted.end()) { // already deleted
continue;
}

list2add = nullptr;
list2del = &item.second;
MetaStatusCode rc = inodeStorage_->ModifyInodeS3ChunkInfoList(
fsId, inodeId, chunkIndex, list2add, list2del);
if (rc != MetaStatusCode::OK) {
LOG(ERROR) << "Modify inode s3chunkinfo list failed, fsId=" << fsId
<< ", inodeId=" << inodeId << ", retCode=" << rc;
return rc;
}
}

// return if needed
if (returnS3ChunkInfoMap) {
*iterator = inodeStorage_->GetInodeS3ChunkInfoList(fsId, inodeId);
if ((*iterator)->Status() != 0) {
*iterator4InodeS3Meta = inodeStorage_->GetInodeS3ChunkInfoList(
fsId, inodeId);
if ((*iterator4InodeS3Meta)->Status() != 0) {
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
}
Expand Down
19 changes: 12 additions & 7 deletions curvefs/src/metaserver/inode_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ class InodeManager {
MetaStatusCode CreateInode(uint64_t inodeId, const InodeParam &param,
Inode *inode);
MetaStatusCode CreateRootInode(const InodeParam &param);
MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeId, Inode *inode);

MetaStatusCode GetInode(uint32_t fsId,
uint64_t inodeId,
Inode *inode,
bool paddingS3ChunkInfo = false);

MetaStatusCode GetInodeAttr(uint32_t fsId, uint64_t inodeId,
InodeAttr *attr);
Expand All @@ -71,12 +75,13 @@ class InodeManager {

MetaStatusCode UpdateInode(const UpdateInodeRequest &request);

MetaStatusCode GetOrModifyS3ChunkInfo(uint32_t fsId,
uint64_t inodeId,
const S3ChunkInfoMap& map2add,
std::shared_ptr<Iterator>* iterator,
bool returnS3ChunkInfoMap,
bool compaction);
MetaStatusCode GetOrModifyS3ChunkInfo(
uint32_t fsId,
uint64_t inodeId,
const S3ChunkInfoMap& map2add,
const S3ChunkInfoMap& map2del,
bool returnS3ChunkInfoMap,
std::shared_ptr<Iterator>* iterator4InodeS3Meta);

MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Expand Down
104 changes: 61 additions & 43 deletions curvefs/src/metaserver/inode_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,37 +180,46 @@ MetaStatusCode InodeStorage::AddS3ChunkInfoList(
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList& list2add) {
// key
size_t size = list2add.s3chunks_size();
uint64_t firstChunkId = list2add.s3chunks(0).chunkid();
uint64_t lastChunkId = list2add.s3chunks(size - 1).chunkid();
const S3ChunkInfoList* list2add) {
if (nullptr == list2add || list2add->s3chunks_size() == 0) {
return MetaStatusCode::OK;
}

size_t size = list2add->s3chunks_size();
uint64_t firstChunkId = list2add->s3chunks(0).chunkid();
uint64_t lastChunkId = list2add->s3chunks(size - 1).chunkid();
Key4S3ChunkInfoList key(fsId, inodeId, chunkIndex,
firstChunkId, lastChunkId, size);
std::string skey = conv_->SerializeToString(key);

Status s = txn->SSet(table4s3chunkinfo_, skey, list2add);
Status s = txn->SSet(table4s3chunkinfo_, skey, *list2add);
return s.ok() ? MetaStatusCode::OK :
MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

// NOTE: s3chunkinfo which its chunkid equal or
// less then min chunkid should be removed
MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn,
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
uint64_t minChunkId,
uint64_t* size4del) {
MetaStatusCode InodeStorage::DelS3ChunkInfoList(
Transaction txn,
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList* list2del) {
if (nullptr == list2del || list2del->s3chunks_size() == 0) {
return MetaStatusCode::OK;
}

size_t size = list2del->s3chunks_size();
uint64_t delFirstChunkId = list2del->s3chunks(0).chunkid();
uint64_t delLastChunkId = list2del->s3chunks(size - 1).chunkid();

// prefix
Prefix4ChunkIndexS3ChunkInfoList prefix(fsId, inodeId, chunkIndex);
std::string sprefix = conv_->SerializeToString(prefix);
auto iterator = txn->SSeek(table4s3chunkinfo_, sprefix);
if (iterator->Status() != 0) {
LOG(ERROR) << "Get iterator failed, prefix=" << sprefix;
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

*size4del = 0;
uint64_t lastChunkId;
Key4S3ChunkInfoList key;
std::vector<std::string> key2del;
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
Expand All @@ -219,61 +228,70 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn,
break;
} else if (!conv_->ParseFromString(skey, &key)) {
return MetaStatusCode::PARSE_FROM_STRING_FAILED;
} else if (key.firstChunkId >= minChunkId) {
break;
}

// firstChunkId < minChunkId
key2del.push_back(skey);
*size4del += key.size;
// current list range: [ ]
// delete list range : [ ]
if (delFirstChunkId <= key.firstChunkId &&
delLastChunkId >= key.lastChunkId) {
key2del.push_back(skey);
// current list range: [ ]
// delete list range : [ ]
} else if (delLastChunkId < key.firstChunkId) {
continue;
} else {
LOG(ERROR) << "wrong delete list range (" << delFirstChunkId
<< "," << delLastChunkId << "), skey=" << skey;
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
}

for (const auto& skey : key2del) {
if (!txn->SDel(table4s3chunkinfo_, skey).ok()) {
LOG(ERROR) << "Delete key failed, skey=" << skey;
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
}
return MetaStatusCode::OK;
}

MetaStatusCode InodeStorage::AppendS3ChunkInfoList(
MetaStatusCode InodeStorage::ModifyInodeS3ChunkInfoList(
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList& list2add,
bool compaction) {
const S3ChunkInfoList* list2add,
const S3ChunkInfoList* list2del) {
WriteLockGuard writeLockGuard(rwLock_);
size_t size = list2add.s3chunks_size();
if (size == 0) {
return MetaStatusCode::OK;
}

auto txn = kvStorage_->BeginTransaction();
if (nullptr == txn) {
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

MetaStatusCode rc;
uint64_t size4add = list2add.s3chunks_size();
uint64_t size4del = 0;
rc = AddS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, list2add);
if (rc == MetaStatusCode::OK && compaction) {
uint64_t minChunkId = list2add.s3chunks(0).chunkid();
rc = RemoveS3ChunkInfoList(txn, fsId, inodeId, chunkIndex,
minChunkId, &size4del);
auto rc = DelS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, list2del);
if (rc == MetaStatusCode::OK) {
rc = AddS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, list2add);
}

if (rc != MetaStatusCode::OK) {
txn->Rollback();
if (!txn->Rollback().ok()) {
LOG(ERROR) << "Rollback transaction failed";
rc = MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
} else if (!txn->Commit().ok()) {
LOG(ERROR) << "Commit transaction failed";
rc = MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

if (rc == MetaStatusCode::OK &&
!UpdateInodeS3MetaSize(fsId, inodeId, size4add, size4del)) {
rc = MetaStatusCode::STORAGE_INTERNAL_ERROR;
LOG(ERROR) << "UpdateInodeS3MetaSize() failed, size4add=" << size4add
<< ", size4del" << size4del;
if (rc != MetaStatusCode::OK) {
return rc;
}

// rc == MetaStatusCode::OK
uint64_t size4add = (nullptr == list2add) ? 0 : list2add->s3chunks_size();
uint64_t size4del = (nullptr == list2del) ? 0 : list2del->s3chunks_size();
if (!UpdateInodeS3MetaSize(fsId, inodeId, size4add, size4del)) {
LOG(ERROR) << "Update inode s3meta size failed";
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
return rc;
}
Expand Down
21 changes: 10 additions & 11 deletions curvefs/src/metaserver/inode_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
#define CURVEFS_SRC_METASERVER_INODE_STORAGE_H_

#include <functional>
#include <utility>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <list>
#include <string>
#include <memory>

#include "absl/container/btree_set.h"
#include "src/common/concurrent/rw_lock.h"
#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/metaserver/storage/converter.h"
Expand All @@ -45,6 +44,7 @@ using ::curvefs::metaserver::storage::Status;
using ::curvefs::metaserver::storage::Iterator;
using ::curvefs::metaserver::storage::KVStorage;
using ::curvefs::metaserver::storage::StorageTransaction;
using ::curvefs::metaserver::storage::Hash;

namespace curvefs {
namespace metaserver {
Expand Down Expand Up @@ -108,11 +108,11 @@ class InodeStorage {
*/
MetaStatusCode Update(const Inode& inode);

MetaStatusCode AppendS3ChunkInfoList(uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList& list2add,
bool compaction);
MetaStatusCode ModifyInodeS3ChunkInfoList(uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList* list2add,
const S3ChunkInfoList* list2del);

MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Expand All @@ -138,15 +138,14 @@ class InodeStorage {
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
const S3ChunkInfoList& list2add);
const S3ChunkInfoList* list2add);

MetaStatusCode RemoveS3ChunkInfoList(
MetaStatusCode DelS3ChunkInfoList(
std::shared_ptr<StorageTransaction> txn,
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
uint64_t minChunkId,
uint64_t* size4del);
const S3ChunkInfoList* list2del);

std::string RealTablename(TABLE_TYPE type, std::string tablename) {
std::ostringstream oss;
Expand Down
21 changes: 14 additions & 7 deletions curvefs/src/metaserver/metastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,20 @@ MetaStatusCode MetaStoreImpl::GetOrModifyS3ChunkInfo(
auto partition = GetPartition(request->partitionid());
if (nullptr == partition) {
rc = MetaStatusCode::PARTITION_NOT_FOUND;
} else {
rc = partition->GetOrModifyS3ChunkInfo(request->fsid(),
request->inodeid(),
request->s3chunkinfoadd(),
iterator,
request->returns3chunkinfomap(),
request->froms3compaction());
response->set_statuscode(rc);
return rc;
}

uint32_t fsId = request->fsid();
uint64_t inodeId = request->inodeid();
rc = partition->GetOrModifyS3ChunkInfo(fsId, inodeId,
request->s3chunkinfoadd(),
request->s3chunkinforemove(),
request->returns3chunkinfomap(),
iterator);
if (rc == MetaStatusCode::OK && !request->supportstreaming()) {
rc = partition->PaddingInodeS3ChunkInfo(
fsId, inodeId, response->mutable_s3chunkinfomap(), 0);
}

if (rc == MetaStatusCode::OK && !request->supportstreaming()) {
Expand Down
Loading

0 comments on commit ba9fa81

Please sign in to comment.