Skip to content

Commit

Permalink
address @wenhaocs's comments, rebased
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Mar 3, 2022
1 parent b87de93 commit 99e54ce
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 88 deletions.
4 changes: 3 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class KVEngine {
* @param value Pointer of value
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
virtual nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class KVStore {
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;

/**
* @brief Release snapshot.
*
Expand All @@ -131,7 +132,8 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
64 changes: 50 additions & 14 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,69 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
static constexpr LogID kInvalidLogId = -1;
static constexpr TermID kInvalidLogTerm = -1;
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot, rate limited to {}, batch size is {}",
spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
CHECK_NOTNULL(store_);
auto partRet = store_->part(spaceId, partId);
if (!ok(partRet)) {
LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
// Create a rocksdb snapshot
auto snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};
auto part = nebula::value(partRet);
// Get the commit log id and commit log term of specified partition
std::string val;
auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val);
if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << folly::sformat(
"Cannot fetch the commit log id and term of space {} part {}", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));
LogID commitLogId;
TermID commitLogTerm;
memcpy(reinterpret_cast<void*>(&commitLogId), val.data(), sizeof(LogID));
memcpy(reinterpret_cast<void*>(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID));

LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to "
"{}, batch size is {}",
spaceId,
partId,
commitLogId,
commitLogTerm,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
for (const auto& prefix : tables) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to
Expand All @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand All @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return false;
}
data.reserve(kReserveNum);
Expand All @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
if (cb(commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -588,7 +589,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
return part->engine()->get(key, value, snapshot);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

/**
* @brief Read a list of keys
Expand Down
59 changes: 10 additions & 49 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,26 +308,12 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitTransLeader(newLeader);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitRemovePeer(peer);
break;
}
default: {
Expand Down Expand Up @@ -405,51 +391,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeHost(OP_ADD_LEARNER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
break;
}
case OP_ADD_PEER: {
auto peer = decodeHost(OP_ADD_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
break;
}
default: {
Expand Down
9 changes: 7 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,13 @@ nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}

nebula::cpp2::ErrorCode RocksEngine::get(const std::string& key, std::string* value) {
nebula::cpp2::ErrorCode RocksEngine::get(const std::string& key,
std::string* value,
const void* snapshot) {
rocksdb::ReadOptions options;
if (UNLIKELY(snapshot != nullptr)) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value);
if (status.ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -240,7 +245,7 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
if (UNLIKELY(snapshot != nullptr)) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
options.prefix_same_as_start = true;
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ class RocksEngine : public KVEngine {
* @param value Pointer of value
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) override;
nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) override;

/**
* @brief Read a list of keys
Expand Down
18 changes: 6 additions & 12 deletions src/kvstore/raftex/SnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
auto spaceId = part->spaceId_;
auto partId = part->partId_;
auto termId = part->term_;
// TODO(heng): maybe the committedLogId is less than the real one in the
// snapshot. It will not loss the data, but maybe some record will be
// committed twice.
auto commitLogIdAndTerm = part->lastCommittedLogId();
const auto& localhost = part->address();
std::vector<folly::Future<raftex::cpp2::SendSnapshotResponse>> results;
VLOG(1) << part->idStr_ << "Begin to send the snapshot to the host " << dst
<< ", commitLogId = " << commitLogIdAndTerm.first
<< ", commitLogTerm = " << commitLogIdAndTerm.second;
accessAllRowsInSnapshot(
spaceId,
partId,
[&, this, p = std::move(p)](const std::vector<std::string>& data,
[&, this, p = std::move(p)](LogID commitLogId,
TermID commitLogTerm,
const std::vector<std::string>& data,
int64_t totalCount,
int64_t totalSize,
SnapshotStatus status) mutable -> bool {
Expand All @@ -60,8 +54,8 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
auto f = send(spaceId,
partId,
termId,
commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
commitLogId,
commitLogTerm,
localhost,
data,
totalSize,
Expand All @@ -77,7 +71,7 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
if (status == SnapshotStatus::DONE) {
VLOG(1) << part->idStr_ << "Finished, totalCount " << totalCount << ", totalSize "
<< totalSize;
p.setValue(commitLogIdAndTerm);
p.setValue(std::make_pair(commitLogId, commitLogTerm));
}
return true;
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/raftex/SnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum SnapshotStatus {
};

// Return false if send snapshot failed, will not send the rest of it.
using SnapshotCallback = folly::Function<bool(const std::vector<std::string>& rows,
using SnapshotCallback = folly::Function<bool(LogID commitLogID,
TermID commitLogTerm,
const std::vector<std::string>& rows,
int64_t totalCount,
int64_t totalSize,
SnapshotStatus status)>;
Expand Down
15 changes: 13 additions & 2 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,19 @@ class NebulaSnapshotManager : public SnapshotManager {
int64_t totalCount = 0;
int64_t totalSize = 0;
std::vector<std::string> data;
auto commitLogIdAndTerm = part->lastCommittedLogId();
folly::RWSpinLock::ReadHolder rh(&part->lock_);
for (auto& row : part->data_) {
if (data.size() > 100) {
LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size()
<< ", total count sended " << totalCount << ", total size sended " << totalSize
<< ", finished false";
cb(data, totalCount, totalSize, SnapshotStatus::IN_PROGRESS);
cb(commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
data,
totalCount,
totalSize,
SnapshotStatus::IN_PROGRESS);
data.clear();
}
auto encoded = encodeSnapshotRow(row.first, row.second);
Expand All @@ -177,7 +183,12 @@ class NebulaSnapshotManager : public SnapshotManager {
LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size()
<< ", total count sended " << totalCount << ", total size sended " << totalSize
<< ", finished true";
cb(data, totalCount, totalSize, SnapshotStatus::DONE);
cb(commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
data,
totalCount,
totalSize,
SnapshotStatus::DONE);
}

RaftexService* service_;
Expand Down
Loading

0 comments on commit 99e54ce

Please sign in to comment.