Skip to content

Commit

Permalink
feat(fs): metaserver support asynchronous snapshot
Browse files Browse the repository at this point in the history
close: #1617

old metaserver snapshot that contain `inode` and `dentry` incompatible with new version

Signed-off-by: NaturalSelect <[email protected]>
  • Loading branch information
NaturalSelect authored and wu-hanqing committed Oct 9, 2023
1 parent 874c098 commit 9f60a9a
Show file tree
Hide file tree
Showing 50 changed files with 3,336 additions and 1,993 deletions.
8 changes: 8 additions & 0 deletions curvefs/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ message PartitionInfo {
optional bool manageFlag = 13; // if a partition has recyclebin inode, set this flag true
}

message AppliedIndex {
required int64 index = 1;
}

message ItemCount {
required uint64 count = 1;
}

message Peer {
optional uint64 id = 1;
optional string address = 2;
Expand Down
9 changes: 9 additions & 0 deletions curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ message PrepareRenameTxRequest {
repeated Dentry dentrys = 4;
}

message TransactionRequest {
enum TransactionType {
None = 0;
Rename = 1;
}
required TransactionType type = 1;
required string rawPayload = 2;
}

message PrepareRenameTxResponse {
required MetaStatusCode statusCode = 1;
optional uint64 appliedIndex = 2;
Expand Down
62 changes: 52 additions & 10 deletions curvefs/src/metaserver/copyset/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ CopysetNode::CopysetNode(PoolId poolId, CopysetId copysetId,
confChangeMtx_(),
ongoingConfChange_(),
metric_(absl::make_unique<OperatorMetric>(poolId_, copysetId_)),
isLoading_(false) {}
isLoading_(false),
snapshotLock_(),
snapshotTask_() {}

CopysetNode::~CopysetNode() {
Stop();
Expand Down Expand Up @@ -188,6 +190,8 @@ void CopysetNode::Stop() {
LOG_IF(ERROR, metaStore_->Destroy() != true)
<< "Failed to clear metastore, copyset: " << name_;
}
// wait for snapshot
WaitSnapshotDone();
}

int CopysetNode::LoadConfEpoch(const std::string& file) {
Expand Down Expand Up @@ -305,7 +309,7 @@ void CopysetNode::on_apply(braft::Iterator& iter) {
auto type = metaOperator->GetOperatorType();
auto task =
std::bind(&MetaOperator::OnApplyFromLog, metaOperator.release(),
TimeUtility::GetTimeofDayUs());
iter.index(), TimeUtility::GetTimeofDayUs());
applyQueue_->Push(hashcode, type, std::move(task));
timer.stop();
g_concurrent_apply_from_log_wait_latency << timer.u_elapsed();
Expand Down Expand Up @@ -361,6 +365,42 @@ class OnSnapshotSaveDoneClosureImpl : public OnSnapshotSaveDoneClosure {

} // namespace

void CopysetNode::DoSnapshot(OnSnapshotSaveDoneClosure* done) {
// NOTE: save metadata cannot be asynchronous
// we need maintain the consistency with
// raft snapshot metadata
std::vector<std::string> files;
brpc::ClosureGuard doneGuard(done);
auto* writer = done->GetSnapshotWriter();
if (!metaStore_->SaveMeta(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store metadata failed";
return;
}
// asynchronous save data
{
std::lock_guard<std::mutex> lock(snapshotLock_);
snapshotTask_ =
std::async(std::launch::async, [files, done, this]() mutable {
brpc::ClosureGuard doneGuard(done);
auto* writer = done->GetSnapshotWriter();
// save data files
if (!metaStore_->SaveData(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store data failed";
return;
}
// add files to snapshot writer
// file is a relative path under the given directory
for (const auto& f : files) {
writer->add_file(f);
}
done->SetSuccess();
});
}
doneGuard.release();
}

void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,
braft::Closure* done) {
LOG(INFO) << "Copyset " << name_ << " saving snapshot to '"
Expand Down Expand Up @@ -389,19 +429,21 @@ void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,

writer->add_file(kConfEpochFilename);

// TODO(wuhanqing): MetaStore::Save will start a thread and do task
// asynchronously, after task completed it will call
// OnSnapshotSaveDoneImpl::Run
// BUT, this manner is not so clear, maybe it better to make thing
// asynchronous directly in here
metaStore_->Save(writer->get_path(), new OnSnapshotSaveDoneClosureImpl(
this, writer, done, metricCtx));
DoSnapshot(
new OnSnapshotSaveDoneClosureImpl(this, writer, done, metricCtx));
doneGuard.release();

// `Cancel` only available for rvalue
std::move(cleanMetricIfFailed).Cancel();
}

void CopysetNode::WaitSnapshotDone() {
std::lock_guard<std::mutex> lock(snapshotLock_);
if (snapshotTask_.valid()) {
snapshotTask_.wait();
}
}

namespace {

class CopysetLoadingGuard {
Expand Down Expand Up @@ -462,7 +504,7 @@ int CopysetNode::on_snapshot_load(braft::SnapshotReader* reader) {

void CopysetNode::on_leader_start(int64_t term) {
/*
* Invoke order in on_leader_start:
* Invoke order in on_leader_start:
* 1. flush concurrent apply queue.
* 2. set term in states machine.
*
Expand Down
16 changes: 14 additions & 2 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
#include <braft/raft.h>
#include <gtest/gtest_prod.h>

#include <condition_variable>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <map>

#include "curvefs/proto/heartbeat.pb.h"
#include "curvefs/src/metaserver/common/types.h"
#include "curvefs/src/metaserver/copyset/concurrent_apply_queue.h"
#include "curvefs/src/metaserver/copyset/conf_epoch_file.h"
Expand All @@ -40,7 +42,6 @@
#include "curvefs/src/metaserver/copyset/metric.h"
#include "curvefs/src/metaserver/copyset/raft_node.h"
#include "curvefs/src/metaserver/metastore.h"
#include "curvefs/proto/heartbeat.pb.h"

namespace curvefs {
namespace metaserver {
Expand Down Expand Up @@ -220,6 +221,13 @@ class CopysetNode : public braft::StateMachine {

FRIEND_TEST(CopysetNodeBlockGroupTest, Test_AggregateBlockStatInfo);

private:
// for snapshot

void WaitSnapshotDone();

void DoSnapshot(OnSnapshotSaveDoneClosure* done);

private:
const PoolId poolId_;
const CopysetId copysetId_;
Expand Down Expand Up @@ -267,6 +275,10 @@ class CopysetNode : public braft::StateMachine {
std::unique_ptr<OperatorMetric> metric_;

std::atomic<bool> isLoading_;

mutable std::mutex snapshotLock_;

std::future<void> snapshotTask_;
};

inline void CopysetNode::Propose(const braft::Task& task) {
Expand Down
77 changes: 39 additions & 38 deletions curvefs/src/metaserver/copyset/meta_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,14 @@ void MetaOperator::FastApplyTask() {
auto task =
std::bind(&MetaOperator::OnApply, this, node_->GetAppliedIndex(),
new MetaOperatorClosure(this), TimeUtility::GetTimeofDayUs());
node_->GetApplyQueue()->Push(HashCode(),
GetOperatorType(), std::move(task));
node_->GetApplyQueue()->Push(HashCode(), GetOperatorType(),
std::move(task));
timer.stop();
g_concurrent_fast_apply_wait_latency << timer.u_elapsed();
}

#define OPERATOR_CAN_BY_PASS_PROPOSE(TYPE) \
bool TYPE##Operator::CanBypassPropose() const { \
return true; \
} \
#define OPERATOR_CAN_BY_PASS_PROPOSE(TYPE) \
bool TYPE##Operator::CanBypassPropose() const { return true; }

// below operator are readonly, so can enable lease read
OPERATOR_CAN_BY_PASS_PROPOSE(GetDentry);
Expand All @@ -144,31 +142,31 @@ OPERATOR_CAN_BY_PASS_PROPOSE(GetVolumeExtent);

#undef OPERATOR_CAN_BY_PASS_PROPOSE

#define OPERATOR_ON_APPLY(TYPE) \
void TYPE##Operator::OnApply(int64_t index, \
google::protobuf::Closure *done, \
uint64_t startTimeUs) { \
brpc::ClosureGuard doneGuard(done); \
uint64_t timeUs = TimeUtility::GetTimeofDayUs(); \
node_->GetMetric()->WaitInQueueLatency(OperatorType::TYPE, \
timeUs - startTimeUs); \
auto status = node_->GetMetaStore()->TYPE( \
static_cast<const TYPE##Request *>(request_), \
static_cast<TYPE##Response *>(response_)); \
uint64_t executeTime = TimeUtility::GetTimeofDayUs() - timeUs; \
node_->GetMetric()->ExecuteLatency(OperatorType::TYPE, executeTime); \
if (status == MetaStatusCode::OK) { \
node_->UpdateAppliedIndex(index); \
static_cast<TYPE##Response *>(response_)->set_appliedindex( \
std::max<uint64_t>(index, node_->GetAppliedIndex())); \
node_->GetMetric()->OnOperatorComplete( \
OperatorType::TYPE, \
TimeUtility::GetTimeofDayUs() - startTimeUs, true); \
} else { \
node_->GetMetric()->OnOperatorComplete( \
OperatorType::TYPE, \
TimeUtility::GetTimeofDayUs() - startTimeUs, false); \
} \
#define OPERATOR_ON_APPLY(TYPE) \
void TYPE##Operator::OnApply(int64_t index, \
google::protobuf::Closure* done, \
uint64_t startTimeUs) { \
brpc::ClosureGuard doneGuard(done); \
uint64_t timeUs = TimeUtility::GetTimeofDayUs(); \
node_->GetMetric()->WaitInQueueLatency(OperatorType::TYPE, \
timeUs - startTimeUs); \
auto status = node_->GetMetaStore()->TYPE( \
static_cast<const TYPE##Request*>(request_), \
static_cast<TYPE##Response*>(response_), index); \
uint64_t executeTime = TimeUtility::GetTimeofDayUs() - timeUs; \
node_->GetMetric()->ExecuteLatency(OperatorType::TYPE, executeTime); \
if (status == MetaStatusCode::OK) { \
node_->UpdateAppliedIndex(index); \
static_cast<TYPE##Response*>(response_)->set_appliedindex( \
std::max<uint64_t>(index, node_->GetAppliedIndex())); \
node_->GetMetric()->OnOperatorComplete( \
OperatorType::TYPE, \
TimeUtility::GetTimeofDayUs() - startTimeUs, true); \
} else { \
node_->GetMetric()->OnOperatorComplete( \
OperatorType::TYPE, \
TimeUtility::GetTimeofDayUs() - startTimeUs, false); \
} \
}

OPERATOR_ON_APPLY(GetDentry);
Expand Down Expand Up @@ -208,7 +206,8 @@ void GetOrModifyS3ChunkInfoOperator::OnApply(int64_t index,
{
brpc::ClosureGuard doneGuard(done);

rc = metastore->GetOrModifyS3ChunkInfo(request, response, &iterator);
rc = metastore->GetOrModifyS3ChunkInfo(request, response, &iterator,
index);
if (rc == MetaStatusCode::OK) {
node_->UpdateAppliedIndex(index);
response->set_appliedindex(
Expand Down Expand Up @@ -251,7 +250,7 @@ void GetVolumeExtentOperator::OnApply(int64_t index,
auto *response = static_cast<GetVolumeExtentResponse *>(response_);
auto *metaStore = node_->GetMetaStore();

auto st = metaStore->GetVolumeExtent(request, response);
auto st = metaStore->GetVolumeExtent(request, response, index);
node_->GetMetric()->OnOperatorComplete(
OperatorType::GetVolumeExtent,
TimeUtility::GetTimeofDayUs() - startTimeUs, st == MetaStatusCode::OK);
Expand Down Expand Up @@ -292,11 +291,11 @@ void GetVolumeExtentOperator::OnApply(int64_t index,
}

#define OPERATOR_ON_APPLY_FROM_LOG(TYPE) \
void TYPE##Operator::OnApplyFromLog(uint64_t startTimeUs) { \
void TYPE##Operator::OnApplyFromLog(int64_t index, uint64_t startTimeUs) { \
std::unique_ptr<TYPE##Operator> selfGuard(this); \
TYPE##Response response; \
auto status = node_->GetMetaStore()->TYPE( \
static_cast<const TYPE##Request *>(request_), &response); \
static_cast<const TYPE##Request*>(request_), &response, index); \
node_->GetMetric()->OnOperatorCompleteFromLog( \
OperatorType::TYPE, TimeUtility::GetTimeofDayUs() - startTimeUs, \
status == MetaStatusCode::OK); \
Expand All @@ -317,24 +316,26 @@ OPERATOR_ON_APPLY_FROM_LOG(UpdateDeallocatableBlockGroup);

#undef OPERATOR_ON_APPLY_FROM_LOG

void GetOrModifyS3ChunkInfoOperator::OnApplyFromLog(uint64_t startTimeUs) {
void GetOrModifyS3ChunkInfoOperator::OnApplyFromLog(int64_t index,
uint64_t startTimeUs) {
std::unique_ptr<GetOrModifyS3ChunkInfoOperator> selfGuard(this);
GetOrModifyS3ChunkInfoRequest request;
GetOrModifyS3ChunkInfoResponse response;
std::shared_ptr<Iterator> iterator;
request = *static_cast<const GetOrModifyS3ChunkInfoRequest *>(request_);
request.set_returns3chunkinfomap(false);
auto status = node_->GetMetaStore()->GetOrModifyS3ChunkInfo(
&request, &response, &iterator);
&request, &response, &iterator, index);
node_->GetMetric()->OnOperatorCompleteFromLog(
OperatorType::GetOrModifyS3ChunkInfo,
TimeUtility::GetTimeofDayUs() - startTimeUs,
status == MetaStatusCode::OK);
}

#define READONLY_OPERATOR_ON_APPLY_FROM_LOG(TYPE) \
void TYPE##Operator::OnApplyFromLog(uint64_t startTimeUs) { \
void TYPE##Operator::OnApplyFromLog(int64_t index, uint64_t startTimeUs) { \
(void)startTimeUs; \
(void)index; \
std::unique_ptr<TYPE##Operator> selfGuard(this); \
}

Expand Down
Loading

0 comments on commit 9f60a9a

Please sign in to comment.