Skip to content

Commit

Permalink
use the same rocksdb snapshot when sending snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Jan 29, 2022
1 parent 5fae645 commit a187afc
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 55 deletions.
6 changes: 5 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,18 @@ class KVEngine {
* @return const void* snapshot pointer.
*/
virtual const void* GetSnapshot() = 0;

/**
* @brief Release snapshot from kv engine.
*
* @param snapshot
*/
virtual void ReleaseSnapshot(const void* snapshot) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
virtual nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) = 0;

// Read a list of keys, if key[i] does not exist, the i-th value in return
// value would be Status::KeyNotFound
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class KVStore {
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;

/**
* @brief Release snapshot.
*
Expand All @@ -103,7 +104,8 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

// Read multiple keys, if error occurs a cpp2::ErrorCode is returned,
// If key[i] does not exist, the i-th value in return value would be
Expand Down
64 changes: 50 additions & 14 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,69 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
static constexpr LogID kInvalidLogId = -1;
static constexpr TermID kInvalidLogTerm = -1;
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot, rate limited to {}, batch size is {}",
spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
CHECK_NOTNULL(store_);
// Get the commit log id and commit log term of specified partition
auto partRet = store_->part(spaceId, partId);
if (!ok(partRet)) {
LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
// Create a rocksdb snapshot
auto snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};
auto part = nebula::value(partRet);
std::string val;
auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val);
if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << folly::sformat(
"Cannot fetch the commit log id and term of space {} part {}", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));
LogID commitLogId;
TermID commitLogTerm;
memcpy(reinterpret_cast<void*>(&commitLogId), val.data(), sizeof(LogID));
memcpy(reinterpret_cast<void*>(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID));

LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to "
"{}, batch size is {}",
spaceId,
partId,
commitLogId,
commitLogTerm,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
for (const auto& prefix : tables) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to
Expand All @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand All @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return false;
}
data.reserve(kReserveNum);
Expand All @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
if (cb(commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -587,7 +588,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
return part->engine()->get(key, value, snapshot);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class NebulaStore : public KVStore, public Handler {
const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override;

/**
* @brief Release snapshot from engine.
*
Expand All @@ -147,7 +148,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

std::pair<nebula::cpp2::ErrorCode, std::vector<Status>> multiGet(
GraphSpaceID spaceId,
Expand Down
9 changes: 7 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,13 @@ nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}

nebula::cpp2::ErrorCode RocksEngine::get(const std::string& key, std::string* value) {
nebula::cpp2::ErrorCode RocksEngine::get(const std::string& key,
std::string* value,
const void* snapshot) {
rocksdb::ReadOptions options;
if (UNLIKELY(snapshot != nullptr)) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value);
if (status.ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -240,7 +245,7 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
if (UNLIKELY(snapshot != nullptr)) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
options.prefix_same_as_start = true;
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ class RocksEngine : public KVEngine {
/*********************
* Data retrieval
********************/
nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) override;
nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) override;

std::vector<Status> multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) override;
Expand Down
18 changes: 6 additions & 12 deletions src/kvstore/raftex/SnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
auto spaceId = part->spaceId_;
auto partId = part->partId_;
auto termId = part->term_;
// TODO(heng): maybe the committedLogId is less than the real one in the
// snapshot. It will not loss the data, but maybe some record will be
// committed twice.
auto commitLogIdAndTerm = part->lastCommittedLogId();
const auto& localhost = part->address();
std::vector<folly::Future<raftex::cpp2::SendSnapshotResponse>> results;
LOG(INFO) << part->idStr_ << "Begin to send the snapshot to the host " << dst
<< ", commitLogId = " << commitLogIdAndTerm.first
<< ", commitLogTerm = " << commitLogIdAndTerm.second;
accessAllRowsInSnapshot(
spaceId,
partId,
[&, this, p = std::move(p)](const std::vector<std::string>& data,
[&, this, p = std::move(p)](LogID commitLogId,
TermID commitLogTerm,
const std::vector<std::string>& data,
int64_t totalCount,
int64_t totalSize,
SnapshotStatus status) mutable -> bool {
Expand All @@ -60,8 +54,8 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
auto f = send(spaceId,
partId,
termId,
commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
commitLogId,
commitLogTerm,
localhost,
data,
totalSize,
Expand All @@ -77,7 +71,7 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
if (status == SnapshotStatus::DONE) {
LOG(INFO) << part->idStr_ << "Finished, totalCount " << totalCount
<< ", totalSize " << totalSize;
p.setValue(commitLogIdAndTerm);
p.setValue(std::make_pair(commitLogId, commitLogTerm));
}
return true;
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/raftex/SnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum SnapshotStatus {
};

// Return false if send snapshot failed, will not send the rest of it.
using SnapshotCallback = folly::Function<bool(const std::vector<std::string>& rows,
using SnapshotCallback = folly::Function<bool(LogID commitLogID,
TermID commitLogTerm,
const std::vector<std::string>& rows,
int64_t totalCount,
int64_t totalSize,
SnapshotStatus status)>;
Expand Down
15 changes: 13 additions & 2 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,19 @@ class NebulaSnapshotManager : public SnapshotManager {
int64_t totalCount = 0;
int64_t totalSize = 0;
std::vector<std::string> data;
auto commitLogIdAndTerm = part->lastCommittedLogId();
folly::RWSpinLock::ReadHolder rh(&part->lock_);
for (auto& row : part->data_) {
if (data.size() > 100) {
LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size()
<< ", total count sended " << totalCount << ", total size sended " << totalSize
<< ", finished false";
cb(data, totalCount, totalSize, SnapshotStatus::IN_PROGRESS);
cb(commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
data,
totalCount,
totalSize,
SnapshotStatus::IN_PROGRESS);
data.clear();
}
auto encoded = encodeSnapshotRow(row.first, row.second);
Expand All @@ -177,7 +183,12 @@ class NebulaSnapshotManager : public SnapshotManager {
LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size()
<< ", total count sended " << totalCount << ", total size sended " << totalSize
<< ", finished true";
cb(data, totalCount, totalSize, SnapshotStatus::DONE);
cb(commitLogIdAndTerm.first,
commitLogIdAndTerm.second,
data,
totalCount,
totalSize,
SnapshotStatus::DONE);
}

RaftexService* service_;
Expand Down
22 changes: 4 additions & 18 deletions src/storage/test/IndexTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ class MockKVStore : public ::nebula::kvstore::KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override {
bool canReadFromFollower = false,
const void* snapshot = nullptr) override {
UNUSED(canReadFromFollower);
UNUSED(partId);
UNUSED(snapshot);
CHECK_EQ(spaceId, spaceId_);
auto iter = kv_.lower_bound(key);
if (iter != kv_.end() && iter->first == key) {
Expand Down Expand Up @@ -148,23 +150,7 @@ class MockKVStore : public ::nebula::kvstore::KVStore {
(*iter) = std::move(mockIter);
return ::nebula::cpp2::ErrorCode::SUCCEEDED;
}
// virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
// PartitionID partId,
// std::string&& prefix,
// std::unique_ptr<KVIterator>* iter,
// bool canReadFromFollower = false) = delete override;
// virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
// PartitionID partId,
// std::string&& start,
// std::string&& prefix,
// std::unique_ptr<KVIterator>* iter,
// bool canReadFromFollower = false) = delete;
// virtual nebula::cpp2::ErrorCode range(GraphSpaceID spaceId,
// PartitionID partId,
// std::string&& start,
// std::string&& end,
// std::unique_ptr<KVIterator>* iter,
// bool canReadFromFollower = false) = delete;

nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
Expand Down

0 comments on commit a187afc

Please sign in to comment.