diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index acd740c3dd5..096f8fc53f1 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -92,7 +92,16 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, CHECK(status.ok()) << status.ToString(); } db_.reset(db); - extractorLen_ = sizeof(PartitionID) + vIdLen; + if (options.table_factory->Name() == rocksdb::TableFactory::kBlockBasedTableName()) { + extractorLen_ = sizeof(PartitionID) + vIdLen; + } else if (options.table_factory->Name() == rocksdb::TableFactory::kPlainTableName()) { + // PlainTable only support prefix-based seek, which means if the prefix is not inserted into + // rocksdb, we can't read them from "prefix" api anymore. For simplarity, we just set the length + // of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by + // tagPrefix(partId) or edgePrefix(partId). + isPlainTable_ = true; + extractorLen_ = sizeof(PartitionID); + } partsNum_ = allParts().size(); LOG(INFO) << "open rocksdb on " << path; @@ -175,7 +184,11 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, const std::string& end, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; + if (!isPlainTable_) { + options.total_order_seek = true; + } else { + options.prefix_same_as_start = true; + } rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { iter->Seek(rocksdb::Slice(start)); @@ -232,8 +245,11 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start, const std::string& prefix, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - // prefix_same_as_start is false by default - options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; + if (!isPlainTable_) { + options.total_order_seek = true; + } else { + options.prefix_same_as_start = true; + } rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { iter->Seek(rocksdb::Slice(start)); diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 2de38333efe..d646264b04e 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -552,6 +552,7 @@ class RocksEngine : public KVEngine { std::unique_ptr backupDb_{nullptr}; int32_t partsNum_ = -1; size_t extractorLen_; + bool isPlainTable_{false}; }; } // namespace kvstore diff --git a/src/kvstore/RocksEngineConfig.cpp b/src/kvstore/RocksEngineConfig.cpp index 5dbcc40532d..e43db0bdc5f 100644 --- a/src/kvstore/RocksEngineConfig.cpp +++ b/src/kvstore/RocksEngineConfig.cpp @@ -292,7 +292,6 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, baseOpts.rate_limiter = rate_limiter; } - size_t prefixLength = sizeof(PartitionID) + vidLen; if (FLAGS_rocksdb_table_format == "BlockBasedTable") { // BlockBasedTableOptions std::unordered_map bbtOptsMap; @@ -330,6 +329,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel; } if (FLAGS_enable_rocksdb_prefix_filtering) { + size_t prefixLength = sizeof(PartitionID) + vidLen; baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); } bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering; @@ -346,6 +346,11 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, if (!FLAGS_enable_rocksdb_prefix_filtering) { return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter"); } + // PlainTable only support prefix-based seek, which means if the prefix is not inserted into + // rocksdb, we can't read them from "prefix" api anymore. For simplarity, we just set the length + // of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by + // tagPrefix(partId) or edgePrefix(partId). + size_t prefixLength = sizeof(PartitionID); baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory()); baseOpts.create_if_missing = true; diff --git a/src/kvstore/test/RocksEngineTest.cpp b/src/kvstore/test/RocksEngineTest.cpp index 0c45aed558f..1c2cdf37be6 100644 --- a/src/kvstore/test/RocksEngineTest.cpp +++ b/src/kvstore/test/RocksEngineTest.cpp @@ -19,16 +19,20 @@ namespace kvstore { const int32_t kDefaultVIdLen = 8; -class RocksEngineTest : public ::testing::TestWithParam> { +class RocksEngineTest : public ::testing::TestWithParam> { public: void SetUp() override { auto param = GetParam(); FLAGS_enable_rocksdb_prefix_filtering = std::get<0>(param); FLAGS_enable_rocksdb_whole_key_filtering = std::get<1>(param); FLAGS_rocksdb_table_format = std::get<2>(param); + flush_ = std::get<3>(param); } void TearDown() override {} + + protected: + bool flush_; }; TEST_P(RocksEngineTest, SimpleTest) { @@ -36,6 +40,9 @@ TEST_P(RocksEngineTest, SimpleTest) { auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->put("key", "val")); std::string val; + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &val)); EXPECT_EQ("val", val); } @@ -45,22 +52,26 @@ TEST_P(RocksEngineTest, RangeTest) { auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); std::vector data; for (int32_t i = 10; i < 20; i++) { - data.emplace_back(std::string(reinterpret_cast(&i), sizeof(int32_t)), + data.emplace_back("key_" + std::string(reinterpret_cast(&i), sizeof(int32_t)), folly::stringPrintf("val_%d", i)); } EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data))); + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } auto checkRange = [&](int32_t start, int32_t end, int32_t expectedFrom, int32_t expectedTotal) { VLOG(1) << "start " << start << ", end " << end << ", expectedFrom " << expectedFrom << ", expectedTotal " << expectedTotal; - std::string s(reinterpret_cast(&start), sizeof(int32_t)); - std::string e(reinterpret_cast(&end), sizeof(int32_t)); + std::string s = "key_" + std::string(reinterpret_cast(&start), sizeof(int32_t)); + std::string e = "key_" + std::string(reinterpret_cast(&end), sizeof(int32_t)); std::unique_ptr iter; EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->range(s, e, &iter)); int num = 0; while (iter->valid()) { num++; - auto key = *reinterpret_cast(iter->key().data()); + // remove the prefix "key_" + auto key = *reinterpret_cast(iter->key().subpiece(4).data()); auto val = iter->val(); EXPECT_EQ(expectedFrom, key); EXPECT_EQ(folly::stringPrintf("val_%d", expectedFrom), val); @@ -83,15 +94,18 @@ TEST_P(RocksEngineTest, PrefixTest) { LOG(INFO) << "Write data in batch and scan them..."; std::vector data; for (int32_t i = 0; i < 10; i++) { - data.emplace_back(folly::stringPrintf("a_%d", i), folly::stringPrintf("val_%d", i)); + data.emplace_back(folly::stringPrintf("key_a_%d", i), folly::stringPrintf("val_%d", i)); } for (int32_t i = 10; i < 15; i++) { - data.emplace_back(folly::stringPrintf("b_%d", i), folly::stringPrintf("val_%d", i)); + data.emplace_back(folly::stringPrintf("key_b_%d", i), folly::stringPrintf("val_%d", i)); } for (int32_t i = 20; i < 40; i++) { - data.emplace_back(folly::stringPrintf("c_%d", i), folly::stringPrintf("val_%d", i)); + data.emplace_back(folly::stringPrintf("key_c_%d", i), folly::stringPrintf("val_%d", i)); } EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data))); + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } auto checkPrefix = [&](const std::string& prefix, int32_t expectedFrom, int32_t expectedTotal) { VLOG(1) << "prefix " << prefix << ", expectedFrom " << expectedFrom << ", expectedTotal " @@ -111,9 +125,9 @@ TEST_P(RocksEngineTest, PrefixTest) { } EXPECT_EQ(expectedTotal, num); }; - checkPrefix("a", 0, 10); - checkPrefix("b", 10, 5); - checkPrefix("c", 20, 20); + checkPrefix("key_a", 0, 10); + checkPrefix("key_b", 10, 5); + checkPrefix("key_c", 20, 20); } TEST_P(RocksEngineTest, RemoveTest) { @@ -147,6 +161,9 @@ TEST_P(RocksEngineTest, RemoveRangeTest) { engine->removeRange(std::string(reinterpret_cast(&s), sizeof(int32_t)), std::string(reinterpret_cast(&e), sizeof(int32_t)))); } + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } { int32_t s = 0, e = 100; std::unique_ptr iter; @@ -218,6 +235,9 @@ TEST_P(RocksEngineTest, IngestTest) { auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); std::vector files = {file}; EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->ingest(files)); + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } std::string result; EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &result)); @@ -262,6 +282,9 @@ TEST_P(RocksEngineTest, BackupRestoreTable) { fs::TempDir restoreRootPath("/tmp/rocksdb_engine_restoretable.XXXXXX"); auto restore_engine = std::make_unique(0, kDefaultVIdLen, restoreRootPath.path()); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->ingest(sst_files)); + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } std::unique_ptr iter; EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->prefix(partPrefix, &iter)); @@ -463,6 +486,9 @@ TEST_P(RocksEngineTest, PrefixBloomTest) { data.emplace_back(NebulaKeyUtils::systemCommitKey(1), "123"); data.emplace_back(NebulaKeyUtils::systemCommitKey(2), "123"); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data))); + if (flush_) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); + } { // vertexPrefix(partId) will not be included @@ -529,15 +555,21 @@ TEST_P(RocksEngineTest, PrefixBloomTest) { } } -INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat, +INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat_FlushOrNot, RocksEngineTest, - ::testing::Values(std::make_tuple(false, false, "BlockBasedTable"), - std::make_tuple(false, true, "BlockBasedTable"), - std::make_tuple(true, false, "BlockBasedTable"), - std::make_tuple(true, true, "BlockBasedTable"), + ::testing::Values(std::make_tuple(false, false, "BlockBasedTable", true), + std::make_tuple(false, false, "BlockBasedTable", false), + std::make_tuple(false, true, "BlockBasedTable", true), + std::make_tuple(false, true, "BlockBasedTable", false), + std::make_tuple(true, false, "BlockBasedTable", true), + std::make_tuple(true, false, "BlockBasedTable", false), + std::make_tuple(true, true, "BlockBasedTable", true), + std::make_tuple(true, true, "BlockBasedTable", false), // PlainTable will always enable prefix extractor - std::make_tuple(true, false, "PlainTable"), - std::make_tuple(true, true, "PlainTable"))); + std::make_tuple(true, false, "PlainTable", true), + std::make_tuple(true, false, "PlainTable", false), + std::make_tuple(true, true, "PlainTable", true), + std::make_tuple(true, true, "PlainTable", false))); TEST(PlainTableTest, BackupRestoreWithoutData) { fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX"); @@ -578,19 +610,33 @@ TEST(PlainTableTest, BackupRestoreWithData) { PartitionID partId = 1; auto checkData = [&] { - std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex"); - std::unique_ptr iter; - auto code = engine->prefix(prefix, &iter); - EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - int32_t num = 0; - while (iter->valid()) { - num++; - iter->next(); + { + std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex"); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 10); + } + { + std::string prefix = NebulaKeyUtils::tagPrefix(partId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 10); } - EXPECT_EQ(num, 10); std::string value; - code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); + auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); EXPECT_EQ("123", value); };