Skip to content

Commit

Permalink
Merge branch 'master' into cherry
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu authored Apr 7, 2021
2 parents e65d8ad + 02bf8aa commit 4d3a052
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
21 changes: 18 additions & 3 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ std::string NebulaKeyUtils::indexPrefix(PartitionID partId, IndexID indexId) {
return key;
}

// static
std::string NebulaKeyUtils::indexPrefix(PartitionID partId) {
PartitionID item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kIndex);
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID));
return key;
}

// static
std::string NebulaKeyUtils::vertexPrefix(PartitionID partId, VertexID vId, TagID tagId) {
tagId &= kTagMaskSet;
Expand Down Expand Up @@ -195,12 +204,18 @@ std::string NebulaKeyUtils::prefix(PartitionID partId) {
}

// static
std::string NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
std::vector<std::string> result;
// snapshot of meta would be all key-value pairs
if (partId == 0) {
return "";
result.emplace_back("");
} else {
result.emplace_back(prefix(partId));
result.emplace_back(indexPrefix(partId));
// kSystem will be written when balance data
// kOperation will be blocked by jobmanager later
}
return prefix(partId);
return result;
}

// static
Expand Down
4 changes: 3 additions & 1 deletion src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class NebulaKeyUtils final {

static std::string indexPrefix(PartitionID partId, IndexID indexId);

static std::string indexPrefix(PartitionID partId);

/**
* Prefix for
* */
Expand Down Expand Up @@ -111,7 +113,7 @@ class NebulaKeyUtils final {

static std::string prefix(PartitionID partId);

static std::string snapshotPrefix(PartitionID partId);
static std::vector<std::string> snapshotPrefix(PartitionID partId);

static PartitionID getPart(const folly::StringPiece& rawKey) {
return readInt<PartitionID>(rawKey.data(), sizeof(PartitionID)) >> 8;
Expand Down
30 changes: 24 additions & 6 deletions src/kvstore/SnapshotManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,35 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
CHECK_NOTNULL(store_);
std::unique_ptr<KVIterator> iter;
auto prefix = NebulaKeyUtils::snapshotPrefix(partId);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize)) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to peers. If send failed,
// will return false.
bool SnapshotManagerImpl::accessTable(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
if (ret != ResultCode::SUCCEEDED) {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
return false;
}
data.reserve(kReserveNum);
int32_t batchSize = 0;
Expand All @@ -37,8 +55,8 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId,
batchSize = 0;
} else {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId
<< "] callback invoked failed";
return;
<< "] send snapshot failed";
return false;
}
}
auto key = iter->key();
Expand All @@ -49,7 +67,7 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId,
totalCount++;
iter->next();
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
return true;
}
} // namespace kvstore
} // namespace nebula
Expand Down
8 changes: 8 additions & 0 deletions src/kvstore/SnapshotManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ class SnapshotManagerImpl : public raftex::SnapshotManager {
raftex::SnapshotCallback cb) override;

private:
bool accessTable(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize);

KVStore* store_;
};

Expand Down

0 comments on commit 4d3a052

Please sign in to comment.