Skip to content

Commit

Permalink
[raft] clean part rocksdb data when raftpart::reset (vesoft-inc#16)
Browse files Browse the repository at this point in the history
* clean rocksdb part data when raftpart::reset

* use removeRange

* fix clang format AllowShortBlocksOnASingleLine:never

* address cangfengzhs's comment

* address critical27's comment

Co-authored-by: panda-sheep <[email protected]>
  • Loading branch information
nebula-bots and panda-sheep authored Sep 2, 2021
1 parent 17e9f19 commit 2cacdae
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 15 deletions.
24 changes: 19 additions & 5 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,32 @@ bool NebulaKeyUtils::isValidVidLen(size_t vIdLen, const VertexID& srcVId, const
}

// static
std::string NebulaKeyUtils::vertexKey(size_t vIdLen,
PartitionID partId,
const VertexID& vId,
TagID tagId) {
std::string NebulaKeyUtils::firstKey(const std::string& prefix, size_t count) {
std::string key;
key.reserve(prefix.size() + count);
key.append(prefix).append(count, '\0');
return key;
}

// static
std::string NebulaKeyUtils::lastKey(const std::string& prefix, size_t count) {
std::string key;
key.reserve(prefix.size() + count);
key.append(prefix).append(count, '\377');
return key;
}

// static
std::string NebulaKeyUtils::vertexKey(
size_t vIdLen, PartitionID partId, const VertexID& vId, TagID tagId, char pad) {
CHECK_GE(vIdLen, vId.size());
int32_t item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kVertex);

std::string key;
key.reserve(kVertexLen + vIdLen);
key.append(reinterpret_cast<const char*>(&item), sizeof(int32_t))
.append(vId.data(), vId.size())
.append(vIdLen - vId.size(), '\0')
.append(vIdLen - vId.size(), pad)
.append(reinterpret_cast<const char*>(&tagId), sizeof(TagID));
return key;
}
Expand Down
15 changes: 14 additions & 1 deletion src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,23 @@ class NebulaKeyUtils final {
*/
static bool isValidVidLen(size_t vIdLen, const VertexID& srcvId, const VertexID& dstvId = "");

/**
* Generate the first key with prefix.
* count means the number of count '\0' is filled after the prefix.
* */
static std::string firstKey(const std::string& prefix, size_t count);

/**
* Generate the last key with prefix.
* count means the number of count '\377' is filled after the prefix.
* */
static std::string lastKey(const std::string& prefix, size_t count);

/**
* Generate vertex key for kv store
* */
static std::string vertexKey(size_t vIdLen, PartitionID partId, const VertexID& vId, TagID tagId);
static std::string vertexKey(
size_t vIdLen, PartitionID partId, const VertexID& vId, TagID tagId, char pad = '\0');

static std::string edgeKey(size_t vIdLen,
PartitionID partId,
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
workers_,
snapshot_,
clientMan_,
diskMan_);
diskMan_,
getSpaceVidLen(spaceId));
std::vector<HostAddr> peers;
if (defaultPeers.empty()) {
// pull the information from meta
Expand Down
54 changes: 47 additions & 7 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

#include "kvstore/Part.h"

#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "common/utils/OperationKeyUtils.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/RocksEngineConfig.h"

Expand All @@ -27,7 +29,8 @@ Part::Part(GraphSpaceID spaceId,
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan)
std::shared_ptr<DiskManager> diskMan,
int32_t vIdLen)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
Expand All @@ -42,7 +45,8 @@ Part::Part(GraphSpaceID spaceId,
spaceId_(spaceId),
partId_(partId),
walPath_(walPath),
engine_(engine) {}
engine_(engine),
vIdLen_(vIdLen) {}

std::pair<LogID, TermID> Part::lastCommittedLogId() {
std::string val;
Expand Down Expand Up @@ -420,11 +424,47 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
}

void Part::cleanup() {
LOG(INFO) << idStr_ << "Clean rocksdb commit key";
auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error "
<< static_cast<int32_t>(res);
LOG(INFO) << idStr_ << "Clean rocksdb part data";
// Remove the vertex, edge, index, systemCommitKey, operation data under the part
const auto& vertexPre = NebulaKeyUtils::vertexPrefix(partId_);
auto ret = engine_->removeRange(NebulaKeyUtils::firstKey(vertexPre, vIdLen_),
NebulaKeyUtils::lastKey(vertexPre, vIdLen_));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Remove the part vertex data failed, error "
<< static_cast<int32_t>(ret);
return;
}

const auto& edgePre = NebulaKeyUtils::edgePrefix(partId_);
ret = engine_->removeRange(NebulaKeyUtils::firstKey(edgePre, vIdLen_),
NebulaKeyUtils::lastKey(edgePre, vIdLen_));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Remove the part edge data failed, error" << static_cast<int32_t>(ret);
return;
}

const auto& indexPre = IndexKeyUtils::indexPrefix(partId_);
ret = engine_->removeRange(NebulaKeyUtils::firstKey(indexPre, sizeof(IndexID)),
NebulaKeyUtils::lastKey(indexPre, sizeof(IndexID)));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Remove the part index data failed, error "
<< static_cast<int32_t>(ret);
return;
}

const auto& operationPre = OperationKeyUtils::operationPrefix(partId_);
ret = engine_->removeRange(NebulaKeyUtils::firstKey(operationPre, sizeof(int64_t)),
NebulaKeyUtils::lastKey(operationPre, sizeof(int64_t)));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Remove the part operation data failed, error "
<< static_cast<int32_t>(ret);
return;
}

ret = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Remove the part system commit data failed, error "
<< static_cast<int32_t>(ret);
}
return;
}
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class Part : public raftex::RaftPart {
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan);
std::shared_ptr<DiskManager> diskMan,
int32_t vIdLen);

virtual ~Part() { LOG(INFO) << idStr_ << "~Part()"; }

Expand Down Expand Up @@ -114,6 +115,7 @@ class Part : public raftex::RaftPart {

private:
KVEngine* engine_ = nullptr;
int32_t vIdLen_;
};

} // namespace kvstore
Expand Down
203 changes: 203 additions & 0 deletions src/kvstore/test/PartTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,62 @@

#include "common/base/Base.h"
#include "common/fs/TempDir.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "common/utils/OperationKeyUtils.h"
#include "kvstore/Part.h"
#include "kvstore/RocksEngine.h"

namespace nebula {
namespace kvstore {

const int32_t kDefaultVIdLen = 8;

void checkVertexData(RocksEngine* engine,
PartitionID partId,
int expectNum,
bool checkVal = false) {
std::string vertexPrefix = NebulaKeyUtils::vertexPrefix(partId);
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(vertexPrefix, &iter);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
if (checkVal) {
ASSERT_EQ(iter->val().str(), folly::stringPrintf("val%d", num));
}
iter->next();
}
ASSERT_EQ(num, expectNum);
}

void checkEdgeData(RocksEngine* engine, PartitionID partId, int expectNum) {
std::string edgePrefix = NebulaKeyUtils::edgePrefix(partId);
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(edgePrefix, &iter);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
ASSERT_EQ(num, expectNum);
}

void checkIndexData(RocksEngine* engine, PartitionID partId, int expectNum) {
std::string indexPrefix = IndexKeyUtils::indexPrefix(partId);
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(indexPrefix, &iter);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
ASSERT_EQ(num, expectNum);
}

TEST(PartTest, RocksTest) {
fs::TempDir dataPath("/tmp/rocksdb_test.XXXXXX");
rocksdb::Options options;
Expand Down Expand Up @@ -46,6 +97,158 @@ TEST(PartTest, RocksTest) {
delete db;
}

TEST(PartTest, KeyOrderTest) {
fs::TempDir dataPath("/tmp/KeyOrderTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, dataPath.path());

std::vector<KV> data;
PartitionID partId = 1;

// build vertex data in part 1, 2
while (partId < 3) {
auto key1 = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "", 0);
data.emplace_back(key1, folly::stringPrintf("val%d", 1));

auto key2 = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "", INT_MAX);
data.emplace_back(key2, folly::stringPrintf("val%d", 2));

auto key3 = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "ffffff", INT_MAX, '\377');
data.emplace_back(key3, folly::stringPrintf("val%d", 3));

auto key4 = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "", INT_MAX, '\377');
data.emplace_back(key4, folly::stringPrintf("val%d", 4));

ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(data));
data.clear();
partId++;
}
{
partId = 1;
while (partId < 3) {
checkVertexData(engine.get(), partId, 4, true);
partId++;
}
}
}

TEST(PartTest, PartCleanTest) {
fs::TempDir dataPath("/tmp/PartCleanTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, dataPath.path());

std::vector<KV> data;
PartitionID partId = 1;

// build vertex data in part 1, 2
while (partId < 3) {
TagID tagId = 1;
for (int i = 0; i < 10; i++) {
auto key = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, std::to_string(i), tagId);
data.emplace_back(key, folly::stringPrintf("val%d", i));
}
tagId = 2;
for (int i = 0; i < 10; i++) {
auto key = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, std::to_string(i), tagId);
data.emplace_back(key, folly::stringPrintf("val%d", i));
}

EdgeType edgetype = 3;
for (int i = 0; i < 10; i++) {
auto key = NebulaKeyUtils::edgeKey(
kDefaultVIdLen, partId, std::to_string(i), edgetype, 0, std::to_string(i));
data.emplace_back(key, folly::stringPrintf("val%d", i));
}
IndexID indexId = 5;
for (int i = 0; i < 10; i++) {
auto key =
IndexKeyUtils::vertexIndexKey(kDefaultVIdLen, partId, indexId, std::to_string(i), "123");
data.emplace_back(key, folly::stringPrintf("val%d", i));
}

data.emplace_back(NebulaKeyUtils::systemCommitKey(partId), "123");
data.emplace_back(NebulaKeyUtils::systemPartKey(partId), "");
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(data));
data.clear();
partId++;
}

{
partId = 1;
while (partId < 2) {
checkVertexData(engine.get(), partId, 20);
checkEdgeData(engine.get(), partId, 10);
checkIndexData(engine.get(), partId, 10);
{
std::string val1;
auto code1 = engine->get(NebulaKeyUtils::systemCommitKey(partId), &val1);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code1);
ASSERT_EQ("123", val1);

std::string val2;
auto code2 = engine->get(NebulaKeyUtils::systemPartKey(partId), &val2);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code2);
}

partId++;
}

{
// remove range part::clean data
partId = 1;

const auto& vertexPre = NebulaKeyUtils::vertexPrefix(partId);
auto ret = engine->removeRange(NebulaKeyUtils::firstKey(vertexPre, kDefaultVIdLen),
NebulaKeyUtils::lastKey(vertexPre, kDefaultVIdLen));
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret);

const auto& edgePre = NebulaKeyUtils::edgePrefix(partId);
ret = engine->removeRange(NebulaKeyUtils::firstKey(edgePre, kDefaultVIdLen),
NebulaKeyUtils::lastKey(edgePre, kDefaultVIdLen));
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret);

const auto& indexPre = IndexKeyUtils::indexPrefix(partId);
ret = engine->removeRange(NebulaKeyUtils::firstKey(indexPre, sizeof(IndexID)),
NebulaKeyUtils::lastKey(indexPre, sizeof(IndexID)));
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret);

const auto& operationPre = OperationKeyUtils::operationPrefix(partId);
ret = engine->removeRange(NebulaKeyUtils::firstKey(operationPre, sizeof(int64_t)),
NebulaKeyUtils::lastKey(operationPre, sizeof(int64_t)));
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret);

ret = engine->remove(NebulaKeyUtils::systemCommitKey(partId));
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret);
}

{
// check data again
partId = 1;
checkVertexData(engine.get(), partId, 0);
checkEdgeData(engine.get(), partId, 0);
checkIndexData(engine.get(), partId, 0);
std::string val1;
auto code1 = engine->get(NebulaKeyUtils::systemCommitKey(partId), &val1);
ASSERT_NE(nebula::cpp2::ErrorCode::SUCCEEDED, code1);

std::string val2;
auto code2 = engine->get(NebulaKeyUtils::systemPartKey(partId), &val2);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code2);
}
{
partId = 2;
checkVertexData(engine.get(), partId, 20);
checkEdgeData(engine.get(), partId, 10);
checkIndexData(engine.get(), partId, 10);
std::string val1;
auto code1 = engine->get(NebulaKeyUtils::systemCommitKey(partId), &val1);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code1);

std::string val2;
auto code2 = engine->get(NebulaKeyUtils::systemPartKey(partId), &val2);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code2);
}
}
}

} // namespace kvstore
} // namespace nebula

Expand Down

0 comments on commit 2cacdae

Please sign in to comment.