Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some rocksdb api will fail in PlainTable #4254

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,16 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
CHECK(status.ok()) << status.ToString();
}
db_.reset(db);
extractorLen_ = sizeof(PartitionID) + vIdLen;
if (options.table_factory->Name() == rocksdb::TableFactory::kBlockBasedTableName()) {
extractorLen_ = sizeof(PartitionID) + vIdLen;
} else if (options.table_factory->Name() == rocksdb::TableFactory::kPlainTableName()) {
// PlainTable only support prefix-based seek, which means if the prefix is not inserted into
// rocksdb, we can't read them from "prefix" api anymore. For simplarity, we just set the length
// of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by
// tagPrefix(partId) or edgePrefix(partId).
isPlainTable_ = true;
extractorLen_ = sizeof(PartitionID);
}
partsNum_ = allParts().size();
LOG(INFO) << "open rocksdb on " << path;

Expand Down Expand Up @@ -175,7 +184,11 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
options.prefix_same_as_start = true;
}
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down Expand Up @@ -232,8 +245,11 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
options.prefix_same_as_start = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So prefix_same_as_start & extractorLen_ together decide the behavior? And only in plaintable?

Copy link
Contributor Author

@critical27 critical27 May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PlainTable must use prefix-based Seek, so we must set prefix_same_as_start = true;
And for BlockBasedTable, if we have prefix extractor, we just use total_order_seek for simplicity, otherwise we need to judge whether the parameter is longer than extractor length (the way we use in prefix function)

}
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ class RocksEngine : public KVEngine {
std::unique_ptr<rocksdb::BackupEngine> backupDb_{nullptr};
int32_t partsNum_ = -1;
size_t extractorLen_;
bool isPlainTable_{false};
};

} // namespace kvstore
Expand Down
7 changes: 6 additions & 1 deletion src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.rate_limiter = rate_limiter;
}

size_t prefixLength = sizeof(PartitionID) + vidLen;
if (FLAGS_rocksdb_table_format == "BlockBasedTable") {
// BlockBasedTableOptions
std::unordered_map<std::string, std::string> bbtOptsMap;
Expand Down Expand Up @@ -330,6 +329,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
if (FLAGS_enable_rocksdb_prefix_filtering) {
size_t prefixLength = sizeof(PartitionID) + vidLen;
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
}
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering;
Expand All @@ -346,6 +346,11 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
if (!FLAGS_enable_rocksdb_prefix_filtering) {
return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter");
}
// PlainTable only support prefix-based seek, which means if the prefix is not inserted into
// rocksdb, we can't read them from "prefix" api anymore. For simplarity, we just set the length
// of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by
// tagPrefix(partId) or edgePrefix(partId).
size_t prefixLength = sizeof(PartitionID);
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory());
baseOpts.create_if_missing = true;
Expand Down
102 changes: 74 additions & 28 deletions src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@ namespace kvstore {

const int32_t kDefaultVIdLen = 8;

class RocksEngineTest : public ::testing::TestWithParam<std::tuple<bool, bool, std::string>> {
class RocksEngineTest : public ::testing::TestWithParam<std::tuple<bool, bool, std::string, bool>> {
public:
void SetUp() override {
auto param = GetParam();
FLAGS_enable_rocksdb_prefix_filtering = std::get<0>(param);
FLAGS_enable_rocksdb_whole_key_filtering = std::get<1>(param);
FLAGS_rocksdb_table_format = std::get<2>(param);
flush_ = std::get<3>(param);
}

void TearDown() override {}

protected:
bool flush_;
};

TEST_P(RocksEngineTest, SimpleTest) {
fs::TempDir rootPath("/tmp/rocksdb_engine_SimpleTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->put("key", "val"));
std::string val;
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &val));
EXPECT_EQ("val", val);
}
Expand All @@ -45,22 +52,26 @@ TEST_P(RocksEngineTest, RangeTest) {
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
std::vector<KV> data;
for (int32_t i = 10; i < 20; i++) {
data.emplace_back(std::string(reinterpret_cast<const char*>(&i), sizeof(int32_t)),
data.emplace_back("key_" + std::string(reinterpret_cast<const char*>(&i), sizeof(int32_t)),
folly::stringPrintf("val_%d", i));
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

auto checkRange = [&](int32_t start, int32_t end, int32_t expectedFrom, int32_t expectedTotal) {
VLOG(1) << "start " << start << ", end " << end << ", expectedFrom " << expectedFrom
<< ", expectedTotal " << expectedTotal;
std::string s(reinterpret_cast<const char*>(&start), sizeof(int32_t));
std::string e(reinterpret_cast<const char*>(&end), sizeof(int32_t));
std::string s = "key_" + std::string(reinterpret_cast<const char*>(&start), sizeof(int32_t));
std::string e = "key_" + std::string(reinterpret_cast<const char*>(&end), sizeof(int32_t));
std::unique_ptr<KVIterator> iter;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->range(s, e, &iter));
int num = 0;
while (iter->valid()) {
num++;
auto key = *reinterpret_cast<const int32_t*>(iter->key().data());
// remove the prefix "key_"
auto key = *reinterpret_cast<const int32_t*>(iter->key().subpiece(4).data());
auto val = iter->val();
EXPECT_EQ(expectedFrom, key);
EXPECT_EQ(folly::stringPrintf("val_%d", expectedFrom), val);
Expand All @@ -83,15 +94,18 @@ TEST_P(RocksEngineTest, PrefixTest) {
LOG(INFO) << "Write data in batch and scan them...";
std::vector<KV> data;
for (int32_t i = 0; i < 10; i++) {
data.emplace_back(folly::stringPrintf("a_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_a_%d", i), folly::stringPrintf("val_%d", i));
}
for (int32_t i = 10; i < 15; i++) {
data.emplace_back(folly::stringPrintf("b_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_b_%d", i), folly::stringPrintf("val_%d", i));
}
for (int32_t i = 20; i < 40; i++) {
data.emplace_back(folly::stringPrintf("c_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_c_%d", i), folly::stringPrintf("val_%d", i));
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

auto checkPrefix = [&](const std::string& prefix, int32_t expectedFrom, int32_t expectedTotal) {
VLOG(1) << "prefix " << prefix << ", expectedFrom " << expectedFrom << ", expectedTotal "
Expand All @@ -111,9 +125,9 @@ TEST_P(RocksEngineTest, PrefixTest) {
}
EXPECT_EQ(expectedTotal, num);
};
checkPrefix("a", 0, 10);
checkPrefix("b", 10, 5);
checkPrefix("c", 20, 20);
checkPrefix("key_a", 0, 10);
checkPrefix("key_b", 10, 5);
checkPrefix("key_c", 20, 20);
}

TEST_P(RocksEngineTest, RemoveTest) {
Expand Down Expand Up @@ -147,6 +161,9 @@ TEST_P(RocksEngineTest, RemoveRangeTest) {
engine->removeRange(std::string(reinterpret_cast<const char*>(&s), sizeof(int32_t)),
std::string(reinterpret_cast<const char*>(&e), sizeof(int32_t))));
}
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}
{
int32_t s = 0, e = 100;
std::unique_ptr<KVIterator> iter;
Expand Down Expand Up @@ -218,6 +235,9 @@ TEST_P(RocksEngineTest, IngestTest) {
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
std::vector<std::string> files = {file};
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->ingest(files));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

std::string result;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &result));
Expand Down Expand Up @@ -262,6 +282,9 @@ TEST_P(RocksEngineTest, BackupRestoreTable) {
fs::TempDir restoreRootPath("/tmp/rocksdb_engine_restoretable.XXXXXX");
auto restore_engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, restoreRootPath.path());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->ingest(sst_files));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

std::unique_ptr<KVIterator> iter;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->prefix(partPrefix, &iter));
Expand Down Expand Up @@ -463,6 +486,9 @@ TEST_P(RocksEngineTest, PrefixBloomTest) {
data.emplace_back(NebulaKeyUtils::systemCommitKey(1), "123");
data.emplace_back(NebulaKeyUtils::systemCommitKey(2), "123");
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

{
// vertexPrefix(partId) will not be included
Expand Down Expand Up @@ -529,15 +555,21 @@ TEST_P(RocksEngineTest, PrefixBloomTest) {
}
}

INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat,
INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat_FlushOrNot,
RocksEngineTest,
::testing::Values(std::make_tuple(false, false, "BlockBasedTable"),
std::make_tuple(false, true, "BlockBasedTable"),
std::make_tuple(true, false, "BlockBasedTable"),
std::make_tuple(true, true, "BlockBasedTable"),
::testing::Values(std::make_tuple(false, false, "BlockBasedTable", true),
std::make_tuple(false, false, "BlockBasedTable", false),
std::make_tuple(false, true, "BlockBasedTable", true),
std::make_tuple(false, true, "BlockBasedTable", false),
std::make_tuple(true, false, "BlockBasedTable", true),
std::make_tuple(true, false, "BlockBasedTable", false),
std::make_tuple(true, true, "BlockBasedTable", true),
std::make_tuple(true, true, "BlockBasedTable", false),
// PlainTable will always enable prefix extractor
std::make_tuple(true, false, "PlainTable"),
std::make_tuple(true, true, "PlainTable")));
std::make_tuple(true, false, "PlainTable", true),
std::make_tuple(true, false, "PlainTable", false),
std::make_tuple(true, true, "PlainTable", true),
std::make_tuple(true, true, "PlainTable", false)));

TEST(PlainTableTest, BackupRestoreWithoutData) {
fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX");
Expand Down Expand Up @@ -578,19 +610,33 @@ TEST(PlainTableTest, BackupRestoreWithData) {
PartitionID partId = 1;

auto checkData = [&] {
std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex");
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
{
std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex");
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
EXPECT_EQ(num, 10);
}
{
std::string prefix = NebulaKeyUtils::tagPrefix(partId);
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
EXPECT_EQ(num, 10);
}
EXPECT_EQ(num, 10);

std::string value;
code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value);
auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
EXPECT_EQ("123", value);
};
Expand Down