From 02bf8aaf5f6505e0a9ac8ffee80694a12467f271 Mon Sep 17 00:00:00 2001 From: CBS <56461666+bright-starry-sky@users.noreply.github.com> Date: Fri, 2 Apr 2021 11:19:30 +0800 Subject: [PATCH] cherry pack #391 from 2.0 (#2463) --- src/common/utils/NebulaKeyUtils.cpp | 21 +++++++++++++++++--- src/common/utils/NebulaKeyUtils.h | 4 +++- src/kvstore/SnapshotManagerImpl.cpp | 30 +++++++++++++++++++++++------ src/kvstore/SnapshotManagerImpl.h | 8 ++++++++ 4 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index d31b7f4ed7b..6f1b4c3035b 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -139,6 +139,15 @@ std::string NebulaKeyUtils::indexPrefix(PartitionID partId, IndexID indexId) { return key; } +// static +std::string NebulaKeyUtils::indexPrefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyType::kIndex); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} + // static std::string NebulaKeyUtils::vertexPrefix(PartitionID partId, VertexID vId, TagID tagId) { tagId &= kTagMaskSet; @@ -195,12 +204,18 @@ std::string NebulaKeyUtils::prefix(PartitionID partId) { } // static -std::string NebulaKeyUtils::snapshotPrefix(PartitionID partId) { +std::vector NebulaKeyUtils::snapshotPrefix(PartitionID partId) { + std::vector result; // snapshot of meta would be all key-value pairs if (partId == 0) { - return ""; + result.emplace_back(""); + } else { + result.emplace_back(prefix(partId)); + result.emplace_back(indexPrefix(partId)); + // kSystem will be written when balance data + // kOperation will be blocked by jobmanager later } - return prefix(partId); + return result; } // static diff --git a/src/common/utils/NebulaKeyUtils.h b/src/common/utils/NebulaKeyUtils.h index 76fe44cfc25..47356cd87bf 100644 --- a/src/common/utils/NebulaKeyUtils.h +++ b/src/common/utils/NebulaKeyUtils.h @@ -84,6 +84,8 @@ class NebulaKeyUtils final { static std::string indexPrefix(PartitionID partId, IndexID indexId); + static std::string indexPrefix(PartitionID partId); + /** * Prefix for * */ @@ -111,7 +113,7 @@ class NebulaKeyUtils final { static std::string prefix(PartitionID partId); - static std::string snapshotPrefix(PartitionID partId); + static std::vector snapshotPrefix(PartitionID partId); static PartitionID getPart(const folly::StringPiece& rawKey) { return readInt(rawKey.data(), sizeof(PartitionID)) >> 8; diff --git a/src/kvstore/SnapshotManagerImpl.cpp b/src/kvstore/SnapshotManagerImpl.cpp index a7cc700377a..5eaaeb2046c 100644 --- a/src/kvstore/SnapshotManagerImpl.cpp +++ b/src/kvstore/SnapshotManagerImpl.cpp @@ -16,17 +16,35 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { CHECK_NOTNULL(store_); - std::unique_ptr iter; - auto prefix = NebulaKeyUtils::snapshotPrefix(partId); + auto tables = NebulaKeyUtils::snapshotPrefix(partId); std::vector data; int64_t totalSize = 0; int64_t totalCount = 0; + + for (const auto& prefix : tables) { + if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize)) { + return; + } + } + cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE); +} + +// Promise is set in callback. Access part of the data, and try to send to peers. If send failed, +// will return false. +bool SnapshotManagerImpl::accessTable(GraphSpaceID spaceId, + PartitionID partId, + const std::string& prefix, + raftex::SnapshotCallback& cb, + std::vector& data, + int64_t& totalCount, + int64_t& totalSize) { + std::unique_ptr iter; auto ret = store_->prefix(spaceId, partId, prefix, &iter); if (ret != ResultCode::SUCCEEDED) { LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); - return; + return false; } data.reserve(kReserveNum); int32_t batchSize = 0; @@ -37,8 +55,8 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, batchSize = 0; } else { LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId - << "] callback invoked failed"; - return; + << "] send snapshot failed"; + return false; } } auto key = iter->key(); @@ -49,7 +67,7 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, totalCount++; iter->next(); } - cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE); + return true; } } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/SnapshotManagerImpl.h b/src/kvstore/SnapshotManagerImpl.h index 177c1c70baa..79592f1d185 100644 --- a/src/kvstore/SnapshotManagerImpl.h +++ b/src/kvstore/SnapshotManagerImpl.h @@ -23,6 +23,14 @@ class SnapshotManagerImpl : public raftex::SnapshotManager { raftex::SnapshotCallback cb) override; private: + bool accessTable(GraphSpaceID spaceId, + PartitionID partId, + const std::string& prefix, + raftex::SnapshotCallback& cb, + std::vector& data, + int64_t& totalCount, + int64_t& totalSize); + KVStore* store_; };