Skip to content

Commit

Permalink
curvefs/metaserver: speed up getting inode by padding inode's s3chunk…
Browse files Browse the repository at this point in the history
…info

which small enought instead of invoke RefreshS3ChunkInfo().
  • Loading branch information
Wine93 committed Apr 25, 2022
1 parent c0d74fe commit 19df31d
Show file tree
Hide file tree
Showing 29 changed files with 460 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,4 @@ docker/*/curvebs

curvefs/docker/curvefs
curvefs/docker/*/curvefs
storage_*
4 changes: 4 additions & 0 deletions curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,7 @@ storage.rocksdb.ordered_write_buffer_size=134217728
storage.rocksdb.ordered_max_write_buffer_number=15
# rocksdb block cache(LRU) capacity (default: 128MB)
storage.rocksdb.block_cache_capacity=134217728
# if the number of inode's s3chunkinfo exceed the limit_size,
# we will sending its with rpc streaming instead of
# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB)
storage.s3_meta_inside_inode.limit_size=25000
4 changes: 4 additions & 0 deletions curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum MetaStatusCode {
PARSE_FROM_STRING_FAILED = 23;
STORAGE_INTERNAL_ERROR = 24;
RPC_STREAM_ERROR = 25;
INODE_S3_META_TOO_LARGE = 26;
}

// dentry interface
Expand Down Expand Up @@ -148,6 +149,7 @@ message GetInodeRequest {
required uint32 fsId = 4;
required uint64 inodeId = 5;
optional uint64 appliedIndex = 6;
optional bool supportStreaming = 7; // for backward compatibility
}

enum FsFileType {
Expand Down Expand Up @@ -212,6 +214,7 @@ message GetInodeResponse {
required MetaStatusCode statusCode = 1;
optional Inode inode = 2;
optional uint64 appliedIndex = 3;
optional bool streaming = 4;
}

message CreateInodeRequest {
Expand Down Expand Up @@ -335,6 +338,7 @@ message GetOrModifyS3ChunkInfoRequest {
required bool returnS3ChunkInfoMap = 8;
optional bool fromS3Compaction = 9;
// todo: we only need a bit flag to indicate a lot of bool
optional bool supportStreaming = 10; // for backward compatibility
}

message GetOrModifyS3ChunkInfoResponse {
Expand Down
20 changes: 12 additions & 8 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid,
}

Inode inode;
bool streaming;

MetaStatusCode ret2 = metaClient_->GetInode(fsId_, inodeid, &inode);
MetaStatusCode ret2 = metaClient_->GetInode(
fsId_, inodeid, &inode, &streaming);
if (ret2 != MetaStatusCode::OK) {
LOG_IF(ERROR, ret2 != MetaStatusCode::NOT_FOUND)
<< "metaClient_ GetInode failed, MetaStatusCode = " << ret2
Expand All @@ -74,13 +76,15 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid,
out = std::make_shared<InodeWrapper>(
std::move(inode), metaClient_);

// NOTE: now the s3chunkinfo in inode is empty for
// we had store it with alone, so we should invoke
// RefreshS3ChunkInfo() to padding inode's s3chunkinfo.
CURVEFS_ERROR rc = out->RefreshS3ChunkInfo();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc;
return rc;
// NOTE: if the s3chunkinfo inside inode is too large,
// we should invoke RefreshS3ChunkInfo() to receive s3chunkinfo
// by streaming and padding its into inode.
if (streaming) {
CURVEFS_ERROR rc = out->RefreshS3ChunkInfo();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc;
return rc;
}
}

std::shared_ptr<InodeWrapper> eliminatedOne;
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ MetaServerClientImpl::PrepareRenameTx(const std::vector<Dentry> &dentrys) {
}

MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
Inode *out) {
Inode *out, bool* streaming) {
auto task = RPCTask {
metaserverClientMetric_->getInode.qps.count << 1;
LatencyUpdater updater(&metaserverClientMetric_->getInode.latency);
Expand All @@ -425,6 +425,7 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
request.set_inodeid(inodeid);
request.set_appliedindex(
metaCache_->GetApplyIndex(CopysetGroupID(poolID, copysetID)));
request.set_supportstreaming(true);

curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.GetInode(cntl, &request, &response, nullptr);
Expand Down Expand Up @@ -454,6 +455,7 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid,
return -1;
}

*streaming = response.has_streaming() ? response.streaming() : false;
auto &s3chunkinfoMap = response.inode().s3chunkinfomap();
for (auto &item : s3chunkinfoMap) {
VLOG(9) << "inodeInfo, inodeId:" << inodeid
Expand Down Expand Up @@ -912,6 +914,7 @@ MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo(
request.set_inodeid(inodeId);
request.set_returns3chunkinfomap(returnS3ChunkInfoMap);
*(request.mutable_s3chunkinfoadd()) = s3ChunkInfos;
request.set_supportstreaming(true);

curvefs::metaserver::MetaServerService_Stub stub(channel);

Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/client/rpcclient/metaserver_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class MetaServerClient {
PrepareRenameTx(const std::vector<Dentry> &dentrys) = 0;

virtual MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid,
Inode *out) = 0;
Inode *out, bool* streaming) = 0;

virtual MetaStatusCode BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
Expand Down Expand Up @@ -162,7 +162,7 @@ class MetaServerClientImpl : public MetaServerClient {
MetaStatusCode PrepareRenameTx(const std::vector<Dentry> &dentrys) override;

MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid,
Inode *out) override;
Inode *out, bool* streaming) override;

MetaStatusCode BatchGetInodeAttr(uint32_t fsId,
std::set<uint64_t> *inodeIds,
Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/metaserver/copyset/meta_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ void GetOrModifyS3ChunkInfoOperator::OnApply(int64_t index,
MetaStatusCode rc;
auto request = static_cast<const GetOrModifyS3ChunkInfoRequest*>(request_);
auto response = static_cast<GetOrModifyS3ChunkInfoResponse*>(response_);
bool streaming = request->returns3chunkinfomap();
auto metastore = node_->GetMetaStore();
std::shared_ptr<StreamConnection> connection;
std::shared_ptr<Iterator> iterator;
Expand All @@ -205,7 +204,9 @@ void GetOrModifyS3ChunkInfoOperator::OnApply(int64_t index,
}

brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_);
if (rc != MetaStatusCode::OK || !streaming) {
if (rc != MetaStatusCode::OK ||
!request->returns3chunkinfomap() ||
!request->supportstreaming()) {
return;
}

Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/metaserver/inode_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,11 @@ MetaStatusCode InodeManager::GetOrModifyS3ChunkInfo(

MetaStatusCode InodeManager::PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Inode* inode) {
S3ChunkInfoMap* m,
uint64_t limit) {
VLOG(1) << "PaddingInodeS3ChunkInfo, fsId: " << fsId
<< ", inodeId: " << inodeId;
return inodeStorage_->PaddingInodeS3ChunkInfo(fsId, inodeId, inode);
return inodeStorage_->PaddingInodeS3ChunkInfo(fsId, inodeId, m, limit);
}

MetaStatusCode InodeManager::UpdateInodeWhenCreateOrRemoveSubNode(
Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/metaserver/inode_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

using ::curve::common::NameLock;
using ::curvefs::metaserver::S3ChunkInfoList;
using S3ChunkInfoMap = google::protobuf::Map<uint64_t, S3ChunkInfoList>;

namespace curvefs {
namespace metaserver {
Expand Down Expand Up @@ -81,7 +80,8 @@ class InodeManager {

MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Inode* inode);
S3ChunkInfoMap* m,
uint64_t limit = 0);

MetaStatusCode UpdateInodeWhenCreateOrRemoveSubNode(uint32_t fsId,
uint64_t inodeId, bool isCreate);
Expand Down
29 changes: 24 additions & 5 deletions curvefs/src/metaserver/inode_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ using ::curvefs::metaserver::storage::Prefix4InodeS3ChunkInfoList;
using ::curvefs::metaserver::storage::Prefix4AllInode;
using Transaction = std::shared_ptr<StorageTransaction>;

using S3ChunkInfoMap = google::protobuf::Map<uint64_t, S3ChunkInfoList>;

InodeStorage::InodeStorage(std::shared_ptr<KVStorage> kvStorage,
const std::string& tablename)
: kvStorage_(kvStorage),
Expand Down Expand Up @@ -184,7 +186,7 @@ MetaStatusCode InodeStorage::AddS3ChunkInfoList(
uint64_t firstChunkId = list2add.s3chunks(0).chunkid();
uint64_t lastChunkId = list2add.s3chunks(size - 1).chunkid();
Key4S3ChunkInfoList key(fsId, inodeId, chunkIndex,
firstChunkId, lastChunkId);
firstChunkId, lastChunkId, size);
std::string skey = conv_->SerializeToString(key);

Status s = txn->SSet(table4s3chunkinfo_, skey, list2add);
Expand All @@ -198,14 +200,16 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn,
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
uint64_t minChunkId) {
uint64_t minChunkId,
uint64_t* size4del) {
Prefix4ChunkIndexS3ChunkInfoList prefix(fsId, inodeId, chunkIndex);
std::string sprefix = conv_->SerializeToString(prefix);
auto iterator = txn->SSeek(table4s3chunkinfo_, sprefix);
if (iterator->Status() != 0) {
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

*size4del = 0;
uint64_t lastChunkId;
Key4S3ChunkInfoList key;
std::vector<std::string> key2del;
Expand All @@ -221,6 +225,7 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn,

// firstChunkId < minChunkId
key2del.push_back(skey);
*size4del += key.size;
}

for (const auto& skey : key2del) {
Expand Down Expand Up @@ -249,24 +254,39 @@ MetaStatusCode InodeStorage::AppendS3ChunkInfoList(
}

MetaStatusCode rc;
uint64_t size4add = list2add.s3chunks_size();
uint64_t size4del = 0;
rc = AddS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, list2add);
if (rc == MetaStatusCode::OK && compaction) {
uint64_t minChunkId = list2add.s3chunks(0).chunkid();
rc = RemoveS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, minChunkId);
rc = RemoveS3ChunkInfoList(txn, fsId, inodeId, chunkIndex,
minChunkId, &size4del);
}

if (rc != MetaStatusCode::OK) {
txn->Rollback();
} else if (!txn->Commit().ok()) {
rc = MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

if (rc == MetaStatusCode::OK &&
!UpdateInodeS3MetaSize(fsId, inodeId, size4add, size4del)) {
rc = MetaStatusCode::STORAGE_INTERNAL_ERROR;
LOG(ERROR) << "UpdateInodeS3MetaSize() failed, size4add=" << size4add
<< ", size4del" << size4del;
}
return rc;
}

MetaStatusCode InodeStorage::PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Inode* inode) {
S3ChunkInfoMap* m,
uint64_t limit) {
ReadLockGuard readLockGuard(rwLock_);
if (limit != 0 && GetInodeS3MetaSize(fsId, inodeId) > limit) {
return MetaStatusCode::INODE_S3_META_TOO_LARGE;
}

auto iterator = GetInodeS3ChunkInfoList(fsId, inodeId);
if (iterator->Status() != 0) {
LOG(ERROR) << "Get inode s3chunkinfo failed";
Expand All @@ -282,7 +302,6 @@ MetaStatusCode InodeStorage::PaddingInodeS3ChunkInfo(int32_t fsId,

Key4S3ChunkInfoList key;
S3ChunkInfoList list;
auto m = inode->mutable_s3chunkinfomap();
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
std::string skey = iterator->Key();
std::string svalue = iterator->Value();
Expand Down
31 changes: 29 additions & 2 deletions curvefs/src/metaserver/inode_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace metaserver {

using ::curvefs::metaserver::storage::Key4Inode;
using ::curvefs::metaserver::storage::Converter;
using S3ChunkInfoMap = google::protobuf::Map<uint64_t, S3ChunkInfoList>;

enum TABLE_TYPE : unsigned char {
kTypeInode = 1,
Expand Down Expand Up @@ -115,7 +116,8 @@ class InodeStorage {

MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId,
uint64_t inodeId,
Inode* inode);
S3ChunkInfoMap* m,
uint64_t limit = 0);

std::shared_ptr<Iterator> GetInodeS3ChunkInfoList(uint32_t fsId,
uint64_t inodeId);
Expand Down Expand Up @@ -143,14 +145,37 @@ class InodeStorage {
uint32_t fsId,
uint64_t inodeId,
uint64_t chunkIndex,
uint64_t minChunkId);
uint64_t minChunkId,
uint64_t* size4del);

std::string RealTablename(TABLE_TYPE type, std::string tablename) {
std::ostringstream oss;
oss << type << ":" << tablename;
return oss.str();
}

static std::string InodeS3MetaSizeKey(uint32_t fsId, uint64_t inodeId) {
std::ostringstream oss;
oss << fsId << ":" << inodeId;
return oss.str();
}

bool UpdateInodeS3MetaSize(uint32_t fsId, uint64_t inodeId,
uint64_t size4add, uint64_t size4del) {
std::string key = InodeS3MetaSizeKey(fsId, inodeId);
uint64_t size = inodeS3MetaSize_[key] + size4add;
if (size < size4del) {
return false;
}
inodeS3MetaSize_[key] = size - size4del;
return true;
}

uint64_t GetInodeS3MetaSize(uint32_t fsId, uint64_t inodeId) {
std::string key = InodeS3MetaSizeKey(fsId, inodeId);
return inodeS3MetaSize_[key];
}

bool FindKey(const std::string& key) {
return keySet_.find(key) != keySet_.end();
}
Expand All @@ -170,6 +195,8 @@ class InodeStorage {
std::string table4s3chunkinfo_;
std::shared_ptr<Converter> conv_;
std::unordered_set<std::string> keySet_;
// key: Hash(inode), value: the number of inode's chunkinfo size
std::unordered_map<std::string, uint64_t> inodeS3MetaSize_;
};

} // namespace metaserver
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/metaserver/metaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ void Metaserver::InitStorage() {
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.rocksdb.block_cache_capacity",
&storageOptions_.blockCacheCapacity));
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.s3_meta_inside_inode.limit_size",
&storageOptions_.s3MetaLimitSizeInsideInode));

bool succ = ::curvefs::metaserver::storage::InitStorage(storageOptions_);
LOG_IF(FATAL, !succ) << "Init storage failed";
Expand Down
Loading

0 comments on commit 19df31d

Please sign in to comment.