Skip to content

Commit

Permalink
feat(fs): metaserver support asynchronous snapshot
Browse files Browse the repository at this point in the history
close: opencurve#1617

Signed-off-by: NaturalSelect <[email protected]>
  • Loading branch information
NaturalSelect committed Aug 19, 2023
1 parent e0d7fbf commit 41a2dc3
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 179 deletions.
90 changes: 82 additions & 8 deletions curvefs/src/metaserver/copyset/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ CopysetNode::CopysetNode(PoolId poolId, CopysetId copysetId,
confChangeMtx_(),
ongoingConfChange_(),
metric_(absl::make_unique<OperatorMetric>(poolId_, copysetId_)),
isLoading_(false) {}
isLoading_(false),
enableSnapshot_(false),
snapshotCv_(),
snapshotLock_(),
snapshotTask_(),
snapshotThread_() {}

CopysetNode::~CopysetNode() {
StopSnapshotThread();
Stop();
raftNode_.reset();
applyQueue_.reset();
Expand Down Expand Up @@ -155,6 +161,8 @@ bool CopysetNode::Init(const CopysetNodeOptions& options) {
peerId_ = braft::PeerId(addr, 0);
raftNode_ = absl::make_unique<RaftNode>(groupId_, peerId_);

// init snapshot thread
StartSnapshotThread();
return true;
}

Expand Down Expand Up @@ -362,6 +370,77 @@ class OnSnapshotSaveDoneClosureImpl : public OnSnapshotSaveDoneClosure {

} // namespace

void CopysetNode::SnapshotThreadRun() {
while (enableSnapshot_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(snapshotLock_);
snapshotCv_.wait(lock);
std::swap(task, snapshotTask_);
}
if (task) {
task();
}
}
}

void CopysetNode::StartSnapshotThread() {
enableSnapshot_ = true;
{
std::unique_lock<std::mutex> lock(snapshotLock_);
snapshotThread_ = std::thread(&CopysetNode::SnapshotThreadRun, this);
}
}

void CopysetNode::StopSnapshotThread() {
enableSnapshot_ = false;
std::thread thread;
{
std::lock_guard<std::mutex> lock(snapshotLock_);
std::swap(thread, snapshotThread_);
snapshotCv_.notify_one();
}
if (thread.joinable()) {
thread.join();
}
}

void CopysetNode::DoSnapshot(OnSnapshotSaveDoneClosure* done) {
// NOTE: save metadata cannot be asynchronous
// we need maintain the consistency with
// raft snapshot metadata
std::vector<std::string> files;
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
if (!metaStore_->SaveMeta(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store metadata failed";
return;
}
// asynchronous save data
{
std::lock_guard<std::mutex> lock(snapshotLock_);
snapshotTask_ = [files, done, this]() mutable {
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
// save data files
if (!metaStore_->SaveData(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store data failed";
return;
}
// add files to snapshot writer
// file is a relative path under the given directory
for (const auto &f : files) {
writer->add_file(f);
}
done->SetSuccess();
};
snapshotCv_.notify_one();
}
doneGuard.release();
}

void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,
braft::Closure* done) {
LOG(INFO) << "Copyset " << name_ << " saving snapshot to '"
Expand Down Expand Up @@ -389,13 +468,8 @@ void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,
}

writer->add_file(kConfEpochFilename);

// TODO(wuhanqing): MetaStore::Save will start a thread and do task
// asynchronously, after task completed it will call
// OnSnapshotSaveDoneImpl::Run
// BUT, this manner is not so clear, maybe it better to make thing
// asynchronous directly in here
metaStore_->Save(writer->get_path(), new OnSnapshotSaveDoneClosureImpl(

DoSnapshot(new OnSnapshotSaveDoneClosureImpl(
this, writer, done, metricCtx));
doneGuard.release();

Expand Down
27 changes: 27 additions & 0 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <string>
#include <vector>
#include <map>
#include <condition_variable>

#include "curvefs/src/metaserver/common/types.h"
#include "curvefs/src/metaserver/copyset/concurrent_apply_queue.h"
Expand Down Expand Up @@ -220,6 +221,16 @@ class CopysetNode : public braft::StateMachine {

FRIEND_TEST(CopysetNodeBlockGroupTest, Test_AggregateBlockStatInfo);

private:
// for snapshot
void SnapshotThreadRun();

void StartSnapshotThread();

void StopSnapshotThread();

void DoSnapshot(OnSnapshotSaveDoneClosure* done);

private:
const PoolId poolId_;
const CopysetId copysetId_;
Expand Down Expand Up @@ -267,6 +278,22 @@ class CopysetNode : public braft::StateMachine {
std::unique_ptr<OperatorMetric> metric_;

std::atomic<bool> isLoading_;

// for asynchronous snapshot
std::atomic_bool enableSnapshot_;

std::condition_variable snapshotCv_;

mutable std::mutex snapshotLock_;

// NOTE: maintain a queue is unnecessary
// we only need one item
std::function<void()> snapshotTask_;
// NOTE: we need to use a thread to
// save snapshot, and this thread
// will blocking on `KVStorage::Checkpoint()`
// so we cannot use coroutine(bthread)
std::thread snapshotThread_;
};

inline void CopysetNode::Propose(const braft::Task& task) {
Expand Down
46 changes: 19 additions & 27 deletions curvefs/src/metaserver/metastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,46 +167,38 @@ void MetaStoreImpl::SaveBackground(const std::string &path,
done->Run();
}

// TODO(NaturalSelect): support asynchronous snapshot
bool MetaStoreImpl::Save(const std::string &dir,
OnSnapshotSaveDoneClosure *done) {
brpc::ClosureGuard doneGuard(done);
bool MetaStoreImpl::SaveMeta(const std::string &dir,
std::vector<std::string> *files) {
// NOTE: we will keep meta fstream consistent with
// snapshot metadata, so we should hold the locks
// during `fstream.Save()`
WriteLockGuard writeLockGuard(rwLock_);

MetaStoreFStream fstream(&partitionMap_, kvStorage_,
copysetNode_->GetPoolId(),
copysetNode_->GetCopysetId());
copysetNode_->GetPoolId(),
copysetNode_->GetCopysetId());

const std::string metadata = dir + "/" + kMetaDataFilename;
bool succ = fstream.Save(metadata);
if (!succ) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
return false;
if (fstream.Save(metadata)) {
files->push_back(kMetaDataFilename);
return true;
}
return false;
}

// checkpoint storage
bool MetaStoreImpl::SaveData(const std::string &dir,
std::vector<std::string> *files) {
butil::Timer timer;
timer.start();
std::vector<std::string> files;
succ = kvStorage_->Checkpoint(dir, &files);
std::vector<std::string> tmp;
bool succ = kvStorage_->Checkpoint(dir, &tmp);
if (!succ) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
return false;
}

for (auto & file : tmp) {
files->push_back(std::move(file));
}
timer.stop();
g_storage_checkpoint_latency << timer.u_elapsed();

// add files to snapshot writer
// file is a relative path under the given directory
auto *writer = done->GetSnapshotWriter();
writer->add_file(kMetaDataFilename);

for (const auto &f : files) {
writer->add_file(f);
}

done->SetSuccess();
return true;
}

Expand Down
13 changes: 9 additions & 4 deletions curvefs/src/metaserver/metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <map>
#include <memory>
#include <string>
#include <vector>

#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/common/rpc_stream.h"
Expand Down Expand Up @@ -111,8 +112,10 @@ class MetaStore {
virtual ~MetaStore() = default;

virtual bool Load(const std::string& pathname) = 0;
virtual bool Save(const std::string& dir,
OnSnapshotSaveDoneClosure* done) = 0;
virtual bool SaveMeta(const std::string &dir,
std::vector<std::string> *files) = 0;
virtual bool SaveData(const std::string &dir,
std::vector<std::string> *files) = 0;
virtual bool Clear() = 0;
virtual bool Destroy() = 0;
virtual MetaStatusCode CreatePartition(
Expand Down Expand Up @@ -224,8 +227,10 @@ class MetaStoreImpl : public MetaStore {
const storage::StorageOptions& storageOptions);

bool Load(const std::string& checkpoint) override;
bool Save(const std::string& dir,
OnSnapshotSaveDoneClosure* done) override;
bool SaveMeta(const std::string &dir,
std::vector<std::string> *files) override;
bool SaveData(const std::string &dir,
std::vector<std::string> *files) override;
bool Clear() override;
bool Destroy() override;

Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/metaserver/metastore_fstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,22 @@ bool MetaStoreFStream::Load(const std::string &pathname, uint8_t *version) {
case ENTRY_TYPE::INODE:
LOG(ERROR)
<< "Snapshot is too old, incompatible with current version";
break;
case ENTRY_TYPE::DENTRY:
LOG(ERROR)
<< "Snapshot is too old, incompatible with current version";
break;
case ENTRY_TYPE::PENDING_TX:
++totalPendingTx;
return LoadPendingTx(partitionId, key, value);
case ENTRY_TYPE::S3_CHUNK_INFO_LIST:
LOG(ERROR)
<< "Snapshot is too old, incompatible with current version";
break;
case ENTRY_TYPE::VOLUME_EXTENT:
LOG(ERROR)
<< "Snapshot is too old, incompatible with current version";
break;
case ENTRY_TYPE::UNKNOWN:
break;
}
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/metaserver/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ bool Partition::Clear() {
return true;
}

// NOTE: store nextid to kvstroage is unnecessary
// we will replay the logs filter the log entries that
// already applied, but keep nextid changes in memory
// so it will grow to corrected value after replay
uint64_t Partition::GetNewInodeId() {
if (partitionInfo_.nextid() > partitionInfo_.end()) {
partitionInfo_.set_status(PartitionStatus::READONLY);
Expand Down
6 changes: 5 additions & 1 deletion curvefs/src/metaserver/storage/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ bool RocksDBStorage::Checkpoint(const std::string& dir,
std::vector<std::string>* files) {
rocksdb::FlushOptions options;
options.wait = true;
options.allow_write_stall = true;
// NOTE: for asynchronous snapshot
// we cannot allow write stall
// rocksdb will wait until flush
// can be performed without causing write stall
options.allow_write_stall = false;
auto status = db_->Flush(options, handles_);
if (!status.ok()) {
LOG(ERROR) << "Failed to flush DB, " << status.ToString();
Expand Down
1 change: 0 additions & 1 deletion curvefs/src/metaserver/trash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ MetaStatusCode TrashImpl::DeleteInodeAndData(const TrashItem &item) {
return MetaStatusCode::S3_DELETE_ERR;
}
}
// TODO(NaturalSelect): store a log index to inode storage is necessary?
ret = inodeStorage_->ForceDelete(Key4Inode(item.fsId, item.inodeId));
if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
LOG(ERROR) << "Delete Inode fail, fsId = " << item.fsId
Expand Down
Loading

0 comments on commit 41a2dc3

Please sign in to comment.