diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 71902c12b89..ea8911e79c9 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -117,7 +117,9 @@ class KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; + virtual nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index 87c3e6dcf39..cc076a0e014 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -108,6 +108,7 @@ class KVStore { virtual const void* GetSnapshot(GraphSpaceID spaceId, PartitionID partID, bool canReadFromFollower = false) = 0; + /** * @brief Release snapshot. * @@ -131,7 +132,8 @@ class KVStore { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) = 0; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 4bff03a0a40..330927bda60 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -29,32 +29,61 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { - auto rateLimiter = std::make_unique(); - CHECK_NOTNULL(store_); - auto tables = NebulaKeyUtils::snapshotPrefix(partId); + static constexpr LogID kInvalidLogId = -1; + static constexpr TermID kInvalidLogTerm = -1; std::vector data; int64_t totalSize = 0; int64_t totalCount = 0; - LOG(INFO) << folly::sformat( - "Space {} Part {} start send snapshot, rate limited to {}, batch size is {}", - spaceId, - partId, - FLAGS_snapshot_part_rate_limit, - FLAGS_snapshot_batch_size); - - const void* snapshot = store_->GetSnapshot(spaceId, partId); + CHECK_NOTNULL(store_); + auto partRet = store_->part(spaceId, partId); + if (!ok(partRet)) { + LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + // Create a rocksdb snapshot + auto snapshot = store_->GetSnapshot(spaceId, partId); SCOPE_EXIT { if (snapshot != nullptr) { store_->ReleaseSnapshot(spaceId, partId, snapshot); } }; + auto part = nebula::value(partRet); + // Get the commit log id and commit log term of specified partition + std::string val; + auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val); + if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << folly::sformat( + "Cannot fetch the commit log id and term of space {} part {}", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID)); + LogID commitLogId; + TermID commitLogTerm; + memcpy(reinterpret_cast(&commitLogId), val.data(), sizeof(LogID)); + memcpy(reinterpret_cast(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID)); + + LOG(INFO) << folly::sformat( + "Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to " + "{}, batch size is {}", + spaceId, + partId, + commitLogId, + commitLogTerm, + FLAGS_snapshot_part_rate_limit, + FLAGS_snapshot_batch_size); + auto rateLimiter = std::make_unique(); + auto tables = NebulaKeyUtils::snapshotPrefix(partId); for (const auto& prefix : tables) { if (!accessTable(spaceId, partId, snapshot, prefix, cb, + commitLogId, + commitLogTerm, data, totalCount, totalSize, @@ -62,7 +91,7 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, return; } } - cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE); } // Promise is set in callback. Access part of the data, and try to send to @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); - cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); return false; } data.reserve(kReserveNum); @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, rateLimiter->consume(static_cast(batchSize), // toConsume static_cast(FLAGS_snapshot_part_rate_limit), // rate static_cast(FLAGS_snapshot_part_rate_limit)); // burstSize - if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) { + if (cb(commitLogId, + commitLogTerm, + data, + totalCount, + totalSize, + raftex::SnapshotStatus::IN_PROGRESS)) { data.clear(); batchSize = 0; } else { diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 1d1bced436c..73447ccbead 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 731df41fcde..302d252fcdb 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -578,7 +578,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { auto ret = part(spaceId, partId); if (!ok(ret)) { return error(ret); @@ -588,7 +589,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED : nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return part->engine()->get(key, value); + return part->engine()->get(key, value, snapshot); } const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 6d032d62e86..1ebc9d5781a 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -220,7 +220,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index ddeaef02b65..d58396aefe7 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -308,26 +308,12 @@ std::tuple Part::commitLogs( } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitTransLeader(newLeader); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitRemovePeer(peer); break; } default: { @@ -405,51 +391,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { auto learner = decodeHost(OP_ADD_LEARNER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add learner " << learner; - addLearner(learner); - } else { - VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add learner " << learner; + addLearner(learner); break; } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess trans leader " << newLeader; - preProcessTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; + preProcessTransLeader(newLeader); break; } case OP_ADD_PEER: { auto peer = decodeHost(OP_ADD_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add peer " << peer; - addPeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add peer " << peer; + addPeer(peer); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess remove peer " << peer; - preProcessRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess remove peer " << peer; + preProcessRemovePeer(peer); break; } default: { diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 43e92cca92f..49695483171 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -175,8 +175,13 @@ nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr(snapshot); + } rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value); if (status.ok()) { return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -240,7 +245,7 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - if (snapshot != nullptr) { + if (UNLIKELY(snapshot != nullptr)) { options.snapshot = reinterpret_cast(snapshot); } options.prefix_same_as_start = true; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 0fb9ecf2c11..62fd2283d8d 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -220,7 +220,9 @@ class RocksEngine : public KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) override; + nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 0182d46a3ad..576f76e3cc9 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -34,19 +34,13 @@ folly::Future>> SnapshotManager::sendSnapshot( auto spaceId = part->spaceId_; auto partId = part->partId_; auto termId = part->term_; - // TODO(heng): maybe the committedLogId is less than the real one in the - // snapshot. It will not loss the data, but maybe some record will be - // committed twice. - auto commitLogIdAndTerm = part->lastCommittedLogId(); const auto& localhost = part->address(); - std::vector> results; - VLOG(1) << part->idStr_ << "Begin to send the snapshot to the host " << dst - << ", commitLogId = " << commitLogIdAndTerm.first - << ", commitLogTerm = " << commitLogIdAndTerm.second; accessAllRowsInSnapshot( spaceId, partId, - [&, this, p = std::move(p)](const std::vector& data, + [&, this, p = std::move(p)](LogID commitLogId, + TermID commitLogTerm, + const std::vector& data, int64_t totalCount, int64_t totalSize, SnapshotStatus status) mutable -> bool { @@ -60,8 +54,8 @@ folly::Future>> SnapshotManager::sendSnapshot( auto f = send(spaceId, partId, termId, - commitLogIdAndTerm.first, - commitLogIdAndTerm.second, + commitLogId, + commitLogTerm, localhost, data, totalSize, @@ -77,7 +71,7 @@ folly::Future>> SnapshotManager::sendSnapshot( if (status == SnapshotStatus::DONE) { VLOG(1) << part->idStr_ << "Finished, totalCount " << totalCount << ", totalSize " << totalSize; - p.setValue(commitLogIdAndTerm); + p.setValue(std::make_pair(commitLogId, commitLogTerm)); } return true; } else { diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h index fc082708c46..ff15e1de07e 100644 --- a/src/kvstore/raftex/SnapshotManager.h +++ b/src/kvstore/raftex/SnapshotManager.h @@ -26,7 +26,9 @@ enum SnapshotStatus { }; // Return false if send snapshot failed, will not send the rest of it. -using SnapshotCallback = folly::Function& rows, +using SnapshotCallback = folly::Function& rows, int64_t totalCount, int64_t totalSize, SnapshotStatus status)>; diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 518ccf67b55..868f44aed7e 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -160,13 +160,19 @@ class NebulaSnapshotManager : public SnapshotManager { int64_t totalCount = 0; int64_t totalSize = 0; std::vector data; + auto commitLogIdAndTerm = part->lastCommittedLogId(); folly::RWSpinLock::ReadHolder rh(&part->lock_); for (auto& row : part->data_) { if (data.size() > 100) { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished false"; - cb(data, totalCount, totalSize, SnapshotStatus::IN_PROGRESS); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::IN_PROGRESS); data.clear(); } auto encoded = encodeSnapshotRow(row.first, row.second); @@ -177,7 +183,12 @@ class NebulaSnapshotManager : public SnapshotManager { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished true"; - cb(data, totalCount, totalSize, SnapshotStatus::DONE); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::DONE); } RaftexService* service_; diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 25c5114ba50..6bc651196c9 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -76,10 +76,14 @@ class MockKVStore : public ::nebula::kvstore::KVStore { void ReleaseSnapshot(GraphSpaceID, PartitionID, const void*) override {} // Read a single key nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, - PartitionID, + PartitionID partId, const std::string& key, std::string* value, - bool) override { + bool canReadFromFollower = false, + const void* snapshot = nullptr) override { + UNUSED(canReadFromFollower); + UNUSED(partId); + UNUSED(snapshot); CHECK_EQ(spaceId, spaceId_); auto iter = kv_.lower_bound(key); if (iter != kv_.end() && iter->first == key) { @@ -129,6 +133,10 @@ class MockKVStore : public ::nebula::kvstore::KVStore { (*iter) = std::move(mockIter); return ::nebula::cpp2::ErrorCode::SUCCEEDED; } +<<<<<<< HEAD +======= + +>>>>>>> 4160c5834 (address @wenhaocs's comments, rebased) nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix,