Skip to content

Commit

Permalink
refactor process snapshot (#4019)
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 authored Mar 18, 2022
1 parent 8585de6 commit 372f6b4
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 78 deletions.
3 changes: 2 additions & 1 deletion src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct AppendLogResponse {
struct SendSnapshotRequest {
1: GraphSpaceID space;
2: PartitionID part;
3: TermID term;
3: TermID current_term;
4: LogID committed_log_id;
5: TermID committed_log_term;
6: string leader_addr;
Expand Down Expand Up @@ -117,6 +117,7 @@ struct HeartbeatResponse {

struct SendSnapshotResponse {
1: common.ErrorCode error_code;
2: TermID current_term;
}

struct GetStateRequest {
Expand Down
14 changes: 8 additions & 6 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,11 @@ void Listener::doApply() {
});
}

std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Listener::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
Expand All @@ -253,7 +254,8 @@ std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::stri
}
if (!apply(data)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return std::make_pair(0, 0);
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
}
if (finished) {
CHECK(!raftLock_.try_lock());
Expand All @@ -268,7 +270,7 @@ std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::stri
committedLogTerm,
lastApplyLogId_);
}
return std::make_pair(count, size);
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

void Listener::resetListener() {
Expand Down
21 changes: 12 additions & 9 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
* // If listener falls far behind from leader, leader would send snapshot to listener. The
* // snapshot is a vector of kv, listener could decode them and treat them as normal logs until
* // all snapshot has been received.
* std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
* LogID committedLogId,
* TermID committedLogTerm,
* bool finished) override;
* std::tuple<cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
* const std::vector<std::string>& data,
* LogID committedLogId,
* TermID committedLogTerm,
* bool finished) override;
*
* * Must implement in derived class
* // extra initialize work could do here
Expand Down Expand Up @@ -279,12 +280,14 @@ class Listener : public raftex::RaftPart {
* @param committedLogId Commit log id of snapshot
* @param committedLogTerm Commit log term of snapshot
* @param finished Whether spapshot is finished
* @return std::pair<int64_t, int64_t> Return count and size of in the data
* @return std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Return {ok, count, size} if
* succeed, else return {errorcode, -1, -1}
*/
std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

/**
* @brief Background job thread will trigger doApply to apply data into state machine periodically
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace nebula {
namespace kvstore {

NebulaStore::~NebulaStore() {
stop();
LOG(INFO) << "Cut off the relationship with meta client";
options_.partMan_.reset();
raftService_->stop();
Expand Down
24 changes: 13 additions & 11 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,11 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Part::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
SCOPED_TIMER(&execTime_);
auto batch = engine_->startBatchWrite();
int64_t count = 0;
Expand All @@ -355,25 +356,26 @@ std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>&
count++;
size += row.size();
auto kv = decodeKV(row);
if (nebula::cpp2::ErrorCode::SUCCEEDED != batch->put(kv.first, kv.second)) {
auto code = batch->put(kv.first, kv.second);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "Failed to call WriteBatch::put()";
return std::make_pair(0, 0);
return {code, kNoSnapshotCount, kNoSnapshotSize};
}
}
if (finished) {
auto retCode = putCommitMsg(batch.get(), committedLogId, committedLogTerm);
if (nebula::cpp2::ErrorCode::SUCCEEDED != retCode) {
auto code = putCommitMsg(batch.get(), committedLogId, committedLogTerm);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "Put commit id into batch failed";
return std::make_pair(0, 0);
return {code, kNoSnapshotCount, kNoSnapshotSize};
}
}
// For snapshot, we open the rocksdb's wal to avoid loss data if crash.
auto code = engine_->commitBatchWrite(
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return std::make_pair(0, 0);
return {code, kNoSnapshotCount, kNoSnapshotSize};
}
return std::make_pair(count, size);
return {code, count, size};
}

nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch,
Expand Down
14 changes: 8 additions & 6 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,14 @@ class Part : public raftex::RaftPart {
* @param committedLogId Commit log id of snapshot
* @param committedLogTerm Commit log term of snapshot
* @param finished Whether spapshot is finished
* @return std::pair<int64_t, int64_t> Return count and size of in the data
*/
std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;
* @return std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Return {ok, count, size} if
* succeed, else return {errorcode, -1, -1}
*/
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

/**
* @brief Encode the commit log id and commit log term to write batch
Expand Down
54 changes: 35 additions & 19 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1317,9 +1317,10 @@ bool RaftPart::needToCleanupSnapshot() {
}

void RaftPart::cleanupSnapshot() {
VLOG(1) << idStr_ << "Snapshot has not been received for a long time, clean up the snapshot";
VLOG(1) << idStr_
<< "Snapshot has not been received for a long time, convert to running so we can receive "
"another snapshot";
std::lock_guard<std::mutex> g(raftLock_);
reset();
status_ = Status::RUNNING;
}

Expand Down Expand Up @@ -1821,9 +1822,12 @@ void RaftPart::processHeartbeatRequest(const cpp2::HeartbeatRequest& req,

void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
cpp2::SendSnapshotResponse& resp) {
VLOG(2) << idStr_ << "Receive snapshot, total rows " << req.get_rows().size()
<< ", total count received " << req.get_total_count() << ", total size received "
<< req.get_total_size() << ", finished " << req.get_done();
VLOG(2) << idStr_ << "Receive snapshot from " << req.get_leader_addr() << ":"
<< req.get_leader_port() << ", commit log id " << req.get_committed_log_id()
<< ", commit log term " << req.get_committed_log_term() << ", leader has sent "
<< req.get_total_count() << " logs of size " << req.get_total_size() << ", finished "
<< req.get_done();

std::lock_guard<std::mutex> g(raftLock_);
// Check status
if (UNLIKELY(status_ == Status::STOPPED)) {
Expand All @@ -1836,30 +1840,44 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY;
return;
}
if (UNLIKELY(role_ != Role::FOLLOWER && role_ != Role::LEARNER)) {
VLOG(3) << idStr_ << "Bad role " << roleStr(role_);
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED;
return;
}
if (UNLIKELY(leader_ != HostAddr(req.get_leader_addr(), req.get_leader_port()) ||
term_ != req.get_term())) {
VLOG(2) << idStr_ << "Term out of date, current term " << term_ << ", received term "
<< req.get_term();
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE;
// Check leadership
nebula::cpp2::ErrorCode err = verifyLeader<cpp2::SendSnapshotRequest>(req);
// Set term_ again because it may be modified in verifyLeader
resp.current_term_ref() = term_;
if (err != nebula::cpp2::ErrorCode::SUCCEEDED) {
// Wrong leadership
VLOG(3) << idStr_ << "Will not follow the leader";
resp.error_code_ref() = err;
return;
}
if (status_ != Status::WAITING_SNAPSHOT) {
VLOG(2) << idStr_ << "Begin to receive the snapshot";
reset();
status_ = Status::WAITING_SNAPSHOT;
lastSnapshotCommitId_ = req.get_committed_log_id();
lastSnapshotCommitTerm_ = req.get_committed_log_term();
lastTotalCount_ = 0;
lastTotalSize_ = 0;
} else if (lastSnapshotCommitId_ != req.get_committed_log_id() ||
lastSnapshotCommitTerm_ != req.get_committed_log_term()) {
// Still waiting for snapshot from another peer, just return error. If the the peer doesn't
// send any logs during raft_snapshot_timeout, will convert to Status::RUNNING, so we can accept
// snapshot again
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT;
return;
}
lastSnapshotRecvDur_.reset();
// TODO(heng): Maybe we should save them into one sst firstly?
auto ret = commitSnapshot(
req.get_rows(), req.get_committed_log_id(), req.get_committed_log_term(), req.get_done());
if (std::get<0>(ret) != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << idStr_ << "Persist snapshot failed";
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED;
return;
}
stats::StatsManager::addValue(kCommitSnapshotLatencyUs, execTime_);
lastTotalCount_ += ret.first;
lastTotalSize_ += ret.second;
lastTotalCount_ += std::get<1>(ret);
lastTotalSize_ += std::get<2>(ret);
if (lastTotalCount_ != req.get_total_count() || lastTotalSize_ != req.get_total_size()) {
VLOG(2) << idStr_ << "Bad snapshot, total rows received " << lastTotalCount_
<< ", total rows sended " << req.get_total_count() << ", total size received "
Expand Down Expand Up @@ -2013,8 +2031,6 @@ void RaftPart::reset() {
cleanup();
lastLogId_ = committedLogId_ = 0;
lastLogTerm_ = committedLogTerm_ = 0;
lastTotalCount_ = 0;
lastTotalSize_ = 0;
}

nebula::cpp2::ErrorCode RaftPart::isCatchedUp(const HostAddr& peer) {
Expand Down
21 changes: 13 additions & 8 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,13 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
* @param committedLogId Commit log id of snapshot
* @param committedLogTerm Commit log term of snapshot
* @param finished Whether spapshot is finished
* @return std::pair<int64_t, int64_t> Return count and size of in the data
* @return std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Return {ok, count, size} if
*/
virtual std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) = 0;
virtual std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) = 0;

/**
* @brief Clean up extra data about the partition, usually related to state machine
Expand Down Expand Up @@ -620,7 +621,7 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
bool needToCleanupSnapshot();

/**
* @brief Clean up the outdated snapshot
* @brief Convert to follower when snapshot has been outdated
*/
void cleanupSnapshot();

Expand Down Expand Up @@ -943,6 +944,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
TermID committedLogTerm_{0};
static constexpr LogID kNoCommitLogId{-1};
static constexpr TermID kNoCommitLogTerm{-1};
static constexpr int64_t kNoSnapshotCount{-1};
static constexpr int64_t kNoSnapshotSize{-1};

// To record how long ago when the last leader message received
time::Duration lastMsgRecvDur_;
Expand Down Expand Up @@ -973,8 +976,10 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
std::shared_ptr<SnapshotManager> snapshot_;

std::shared_ptr<thrift::ThriftClientManager<cpp2::RaftexServiceAsyncClient>> clientMan_;
// Used in snapshot, record the last total count and total size received from
// request
// Used in snapshot, record the commitLogId and commitLogTerm of the snapshot, as well as
// last total count and total size received from request
LogID lastSnapshotCommitId_ = 0;
TermID lastSnapshotCommitTerm_ = 0;
int64_t lastTotalCount_ = 0;
int64_t lastTotalSize_ = 0;
time::Duration lastSnapshotRecvDur_;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/SnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ folly::Future<raftex::cpp2::SendSnapshotResponse> SnapshotManager::send(
raftex::cpp2::SendSnapshotRequest req;
req.space_ref() = spaceId;
req.part_ref() = partId;
req.term_ref() = termId;
req.current_term_ref() = termId;
req.committed_log_id_ref() = committedLogId;
req.committed_log_term_ref() = committedLogTerm;
req.leader_addr_ref() = localhost.host;
Expand Down
11 changes: 6 additions & 5 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> TestShard::commitLogs(
return {nebula::cpp2::ErrorCode::SUCCEEDED, lastId, lastTerm};
}

std::pair<int64_t, int64_t> TestShard::commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> TestShard::commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
folly::RWSpinLock::WriteHolder wh(&lock_);
int64_t count = 0;
int64_t size = 0;
Expand All @@ -233,7 +234,7 @@ std::pair<int64_t, int64_t> TestShard::commitSnapshot(const std::vector<std::str
LOG(INFO) << idStr_ << "Commit the snapshot committedLogId " << committedLogId << ", term "
<< committedLogTerm;
}
return std::make_pair(count, size);
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

nebula::cpp2::ErrorCode TestShard::cleanup() {
Expand Down
9 changes: 5 additions & 4 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ class TestShard : public RaftPart {
return true;
}

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

nebula::cpp2::ErrorCode cleanup() override;

Expand Down
17 changes: 9 additions & 8 deletions src/kvstore/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ class DummyListener : public Listener {
return data_;
}

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override {
std::tuple<cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override {
bool unl = raftLock_.try_lock();
auto result = Listener::commitSnapshot(data, committedLogId, committedLogTerm, finished);
if (unl) {
raftLock_.unlock();
}
committedSnapshot_.first += result.first;
committedSnapshot_.second += result.second;
committedSnapshot_.first += std::get<1>(result);
committedSnapshot_.second += std::get<2>(result);
snapshotBatchCount_++;
return result;
}
Expand Down Expand Up @@ -447,8 +447,9 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) {
}
auto dummy = dummies_[partId];
auto ret = dummy->commitSnapshot(rows, 100, 1, true);
CHECK_EQ(ret.first, 100);
CHECK_EQ(ret.second, size);
EXPECT_EQ(std::get<0>(ret), nebula::cpp2::ErrorCode::SUCCEEDED);
EXPECT_EQ(std::get<1>(ret), 100);
EXPECT_EQ(std::get<2>(ret), size);
}

LOG(INFO) << "Check listener's data";
Expand Down

0 comments on commit 372f6b4

Please sign in to comment.