diff --git a/conf/nebula-storaged.conf.default b/conf/nebula-storaged.conf.default index 79c1e8cd48b..4aead99ba67 100644 --- a/conf/nebula-storaged.conf.default +++ b/conf/nebula-storaged.conf.default @@ -90,8 +90,10 @@ # * kAll, Collect all stats --rocksdb_stats_level=kExceptHistogramOrTimers -# Whether or not to enable rocksdb's prefix bloom filter, disabled by default. ---enable_rocksdb_prefix_filtering=false +# Whether or not to enable rocksdb's prefix bloom filter, enabled by default. +--enable_rocksdb_prefix_filtering=true +# Whether or not to enable rocksdb's whole key bloom filter, disabled by default. +--enable_rocksdb_whole_key_filtering=false ############## rocksdb Options ############## # rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma diff --git a/conf/nebula-storaged.conf.production b/conf/nebula-storaged.conf.production index e0911a27a3a..f41a16b7d21 100644 --- a/conf/nebula-storaged.conf.production +++ b/conf/nebula-storaged.conf.production @@ -96,8 +96,10 @@ # * kAll, Collect all stats --rocksdb_stats_level=kExceptHistogramOrTimers -# Whether or not to enable rocksdb's prefix bloom filter, disabled by default. ---enable_rocksdb_prefix_filtering=false +# Whether or not to enable rocksdb's prefix bloom filter, enabled by default. +--enable_rocksdb_prefix_filtering=true +# Whether or not to enable rocksdb's whole key bloom filter, disabled by default. +--enable_rocksdb_whole_key_filtering=false ############### misc #################### --snapshot_part_rate_limit=8388608 diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index dd23f8ccd00..03d12528910 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -111,6 +111,10 @@ class KVEngine { virtual nebula::cpp2::ErrorCode setDBOption(const std::string& configKey, const std::string& configValue) = 0; + // Get DB Property + virtual ErrorOr getProperty( + const std::string& property) = 0; + virtual nebula::cpp2::ErrorCode compact() = 0; virtual nebula::cpp2::ErrorCode flush() = 0; diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index c4ac838a45a..dcef7e1a9f9 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -216,6 +216,9 @@ class KVStore { virtual std::vector getDataRoot() const = 0; + virtual ErrorOr getProperty( + GraphSpaceID spaceId, const std::string& property) = 0; + protected: KVStore() = default; }; diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 4cbc6dadc32..aff31de2925 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -11,7 +11,7 @@ #include "kvstore/RateLimiter.h" DEFINE_uint32(snapshot_part_rate_limit, - 1024 * 1024 * 2, + 1024 * 1024 * 8, "max bytes of pulling snapshot for each partition in one second"); DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes"); @@ -22,16 +22,15 @@ const int32_t kReserveNum = 1024 * 4; NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { // Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit. - // So by default, the total send rate is limited to 4 * 2Mb = 8Mb. + // So by default, the total send rate is limited to 4 * 8Mb = 32Mb. LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit - << " for each part"; + << " for each part by default"; } void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { - auto rateLimiter = std::make_unique(FLAGS_snapshot_part_rate_limit, - FLAGS_snapshot_part_rate_limit); + auto rateLimiter = std::make_unique(); CHECK_NOTNULL(store_); auto tables = NebulaKeyUtils::snapshotPrefix(partId); std::vector data; @@ -74,7 +73,9 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, size_t batchSize = 0; while (iter && iter->valid()) { if (batchSize >= FLAGS_snapshot_batch_size) { - rateLimiter->consume(batchSize); + rateLimiter->consume(static_cast(batchSize), // toConsume + static_cast(FLAGS_snapshot_part_rate_limit), // rate + static_cast(FLAGS_snapshot_part_rate_limit)); // burstSize if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) { data.clear(); batchSize = 0; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index aa7bfc767fc..0727677f876 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1168,5 +1168,26 @@ nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spac return nebula::cpp2::ErrorCode::SUCCEEDED; } +ErrorOr NebulaStore::getProperty( + GraphSpaceID spaceId, const std::string& property) { + auto spaceRet = space(spaceId); + if (!ok(spaceRet)) { + LOG(ERROR) << "Get Space " << spaceId << " Failed"; + return error(spaceRet); + } + auto space = nebula::value(spaceRet); + + folly::dynamic obj = folly::dynamic::object; + for (size_t i = 0; i < space->engines_.size(); i++) { + auto val = space->engines_[i]->getProperty(property); + if (!ok(val)) { + return error(val); + } + auto eng = folly::stringPrintf("Engine %zu", i); + obj[eng] = std::move(value(val)); + } + return folly::toJson(obj); +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 562226c171a..e8529f18010 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -275,6 +275,9 @@ class NebulaStore : public KVStore, public Handler { nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId, std::vector keyValues) override; + ErrorOr getProperty(GraphSpaceID spaceId, + const std::string& property) override; + private: void loadPartFromDataPath(); diff --git a/src/kvstore/RateLimiter.h b/src/kvstore/RateLimiter.h index 7b91ad4b189..97b4ef196fa 100644 --- a/src/kvstore/RateLimiter.h +++ b/src/kvstore/RateLimiter.h @@ -21,32 +21,29 @@ namespace kvstore { // For now, there are two major cases: snapshot (both for balance or catch up) and rebuild index. class RateLimiter { public: - RateLimiter(int32_t rate, int32_t burstSize) - : rate_(static_cast(rate)), burstSize_(static_cast(burstSize)) { + RateLimiter() { // token will be available after 1 second, to prevent speed spike at the beginning auto now = time::WallClock::fastNowInSec(); int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1; - bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast(now + waitInSec))); + bucket_.reset(new folly::DynamicTokenBucket(static_cast(now + waitInSec))); } // Caller must make sure the **the parition has been add, and won't be removed during consume.** // Snaphot and rebuild index follow this principle by design. - void consume(size_t toConsume) { - if (toConsume > burstSize_) { + void consume(double toConsume, double rate, double burstSize) { + if (toConsume > burstSize) { // consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead std::this_thread::sleep_for(std::chrono::seconds(1)); } else { // If there are enouth tokens, consume and return immediately. // If not, cosume anyway, but sleep enough time before return. auto now = time::WallClock::fastNowInSec(); - bucket_->consumeWithBorrowAndWait(static_cast(toConsume), static_cast(now)); + bucket_->consumeWithBorrowAndWait(toConsume, rate, burstSize, static_cast(now)); } } private: - std::unique_ptr bucket_; - double rate_{1 << 20}; - double burstSize_{1 << 20}; + std::unique_ptr bucket_; }; } // namespace kvstore diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 6b536b9cecc..66002f0640a 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -13,7 +13,6 @@ #include "common/fs/FileUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "kvstore/KVStore.h" -#include "kvstore/RocksEngineConfig.h" DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset"); @@ -126,6 +125,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, CHECK(status.ok()) << status.ToString(); db_.reset(db); partsNum_ = allParts().size(); + extractorLen_ = sizeof(PartitionID) + vIdLen; LOG(INFO) << "open rocksdb on " << path; backup(); @@ -202,7 +202,7 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, const std::string& end, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - options.total_order_seek = true; + options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { iter->Seek(rocksdb::Slice(start)); @@ -213,7 +213,32 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix, std::unique_ptr* storageIter) { + // In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make + // sure the prefix bloom filter exists. But this is quite error-proning, so we do a check here. + if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) { + return prefixWithExtractor(prefix, storageIter); + } else { + return prefixWithoutExtractor(prefix, storageIter); + } +} + +nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix, + std::unique_ptr* storageIter) { + rocksdb::ReadOptions options; + options.prefix_same_as_start = true; + rocksdb::Iterator* iter = db_->NewIterator(options); + if (iter) { + iter->Seek(rocksdb::Slice(prefix)); + } + storageIter->reset(new RocksPrefixIter(iter, prefix)); + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor( + const std::string& prefix, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + // prefix_same_as_start is false by default + options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { iter->Seek(rocksdb::Slice(prefix)); @@ -226,6 +251,8 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start, const std::string& prefix, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + // prefix_same_as_start is false by default + options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { iter->Seek(rocksdb::Slice(start)); @@ -397,6 +424,16 @@ nebula::cpp2::ErrorCode RocksEngine::setDBOption(const std::string& configKey, } } +ErrorOr RocksEngine::getProperty( + const std::string& property) { + std::string value; + if (!db_->GetProperty(property, &value)) { + return nebula::cpp2::ErrorCode::E_INVALID_PARM; + } else { + return value; + } +} + nebula::cpp2::ErrorCode RocksEngine::compact() { rocksdb::CompactRangeOptions options; options.change_level = FLAGS_rocksdb_compact_change_level; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index c781b24cab5..da75eaac096 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -15,6 +15,7 @@ #include "common/base/Base.h" #include "kvstore/KVEngine.h" #include "kvstore/KVIterator.h" +#include "kvstore/RocksEngineConfig.h" namespace nebula { namespace kvstore { @@ -128,6 +129,12 @@ class RocksEngine : public KVEngine { const std::string& prefix, std::unique_ptr* iter) override; + nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix, + std::unique_ptr* storageIter); + + nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix, + std::unique_ptr* storageIter); + /********************* * Data modification ********************/ @@ -161,6 +168,8 @@ class RocksEngine : public KVEngine { nebula::cpp2::ErrorCode setDBOption(const std::string& configKey, const std::string& configValue) override; + ErrorOr getProperty(const std::string& property) override; + nebula::cpp2::ErrorCode compact() override; nebula::cpp2::ErrorCode flush() override; @@ -190,6 +199,7 @@ class RocksEngine : public KVEngine { std::string backupPath_; std::unique_ptr backupDb_{nullptr}; int32_t partsNum_ = -1; + size_t extractorLen_; }; } // namespace kvstore diff --git a/src/kvstore/RocksEngineConfig.cpp b/src/kvstore/RocksEngineConfig.cpp index 81f32e37a30..47eee99aaa3 100644 --- a/src/kvstore/RocksEngineConfig.cpp +++ b/src/kvstore/RocksEngineConfig.cpp @@ -51,7 +51,7 @@ DEFINE_int64(rocksdb_block_cache, 1024, "The default block cache size used in BlockBasedTable. The unit is MB"); -DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache"); +DEFINE_int32(rocksdb_row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache"); DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp"); @@ -78,15 +78,13 @@ DEFINE_int32(rocksdb_rate_limit, 0, "write limit in bytes per sec. The unit is MB. 0 means unlimited."); -DEFINE_bool(enable_rocksdb_prefix_filtering, +DEFINE_bool(enable_rocksdb_whole_key_filtering, false, + "Whether or not to enable rocksdb's whole key bloom filter"); + +DEFINE_bool(enable_rocksdb_prefix_filtering, + true, "Whether or not to enable rocksdb's prefix bloom filter."); -DEFINE_bool(rocksdb_prefix_bloom_filter_length_flag, - false, - "If true, prefix bloom filter will be sizeof(PartitionID) + vidLen + " - "sizeof(EdgeType). " - "If false, prefix bloom filter will be sizeof(PartitionID) + vidLen. "); -DEFINE_int32(rocksdb_plain_table_prefix_length, 4, "PlainTable prefix size"); DEFINE_bool(rocksdb_compact_change_level, true, @@ -118,34 +116,6 @@ DEFINE_int32(rocksdb_backup_interval_secs, namespace nebula { namespace kvstore { -class GraphPrefixTransform : public rocksdb::SliceTransform { - private: - size_t prefixLen_; - std::string name_; - - public: - explicit GraphPrefixTransform(size_t prefixLen) - : prefixLen_(prefixLen), name_("nebula.GraphPrefix." + std::to_string(prefixLen_)) {} - - const char* Name() const override { return name_.c_str(); } - - rocksdb::Slice Transform(const rocksdb::Slice& src) const override { - return rocksdb::Slice(src.data(), prefixLen_); - } - - bool InDomain(const rocksdb::Slice& key) const override { - if (key.size() < prefixLen_) { - return false; - } - // And we should not use NebulaKeyUtils::isVertex or isEdge here, because it - // will regard the prefix itself not in domain since its length does not - // satisfy - constexpr int32_t len = static_cast(sizeof(NebulaKeyType)); - auto type = static_cast(readInt(key.data(), len) & kTypeMask); - return type == NebulaKeyType::kEdge || type == NebulaKeyType::kVertex; - } -}; - static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) { static std::unordered_map m = { {"no", rocksdb::kNoCompression}, @@ -256,10 +226,8 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, baseOpts.rate_limiter = rate_limiter; } + size_t prefixLength = sizeof(PartitionID) + vidLen; if (FLAGS_rocksdb_table_format == "BlockBasedTable") { - size_t prefixLength = FLAGS_rocksdb_prefix_bloom_filter_length_flag - ? sizeof(PartitionID) + vidLen + sizeof(EdgeType) - : sizeof(PartitionID) + vidLen; // BlockBasedTableOptions std::unordered_map bbtOptsMap; if (!loadOptionsMap(bbtOptsMap, FLAGS_rocksdb_block_based_table_options)) { @@ -279,9 +247,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, bbtOpts.block_cache = blockCache; } - if (FLAGS_row_cache_num) { + if (FLAGS_rocksdb_row_cache_num) { static std::shared_ptr rowCache = - rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp); + rocksdb::NewLRUCache(FLAGS_rocksdb_row_cache_num, FLAGS_cache_bucket_exp); baseOpts.row_cache = rowCache; } @@ -296,8 +264,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel; } if (FLAGS_enable_rocksdb_prefix_filtering) { - baseOpts.prefix_extractor.reset(new GraphPrefixTransform(prefixLength)); + baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); } + bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering; baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts)); baseOpts.create_if_missing = true; } else if (FLAGS_rocksdb_table_format == "PlainTable") { @@ -308,8 +277,10 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, // by default. WAL_ttl_seconds and rocksdb_backup_interval_secs need to be // modify together if necessary FLAGS_rocksdb_disable_wal = false; - baseOpts.prefix_extractor.reset( - rocksdb::NewCappedPrefixTransform(FLAGS_rocksdb_plain_table_prefix_length)); + if (!FLAGS_enable_rocksdb_prefix_filtering) { + return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter"); + } + baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory()); baseOpts.create_if_missing = true; } else { diff --git a/src/kvstore/RocksEngineConfig.h b/src/kvstore/RocksEngineConfig.h index 7fd174cebec..018fae42dbc 100644 --- a/src/kvstore/RocksEngineConfig.h +++ b/src/kvstore/RocksEngineConfig.h @@ -37,7 +37,7 @@ DECLARE_int64(rocksdb_block_cache); DECLARE_int32(rocksdb_batch_size); -DECLARE_int32(row_cache_num); +DECLARE_int32(rocksdb_row_cache_num); DECLARE_int32(cache_bucket_exp); @@ -53,8 +53,7 @@ DECLARE_bool(enable_rocksdb_statistics); DECLARE_string(rocksdb_stats_level); DECLARE_bool(enable_rocksdb_prefix_filtering); -DECLARE_bool(rocksdb_prefix_bloom_filter_length_flag); -DECLARE_int32(rocksdb_plain_table_prefix_length); +DECLARE_bool(enable_rocksdb_whole_key_filtering); // rocksdb compact RangeOptions DECLARE_bool(rocksdb_compact_change_level); diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index ea20c0a18bb..90a78931351 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -19,8 +19,12 @@ namespace nebula { namespace raftex { SnapshotManager::SnapshotManager() { - executor_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_worker_threads)); - ioThreadPool_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_io_threads)); + executor_.reset(new folly::IOThreadPoolExecutor( + FLAGS_snapshot_worker_threads, + std::make_shared("snapshot-worker"))); + ioThreadPool_.reset(new folly::IOThreadPoolExecutor( + FLAGS_snapshot_io_threads, + std::make_shared("snapshot-ioexecutor"))); } folly::Future SnapshotManager::sendSnapshot(std::shared_ptr part, diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 022df3f3a65..98da491681a 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -917,6 +917,7 @@ TEST(NebulaStoreTest, BackupRestoreTest) { FLAGS_rocksdb_table_format = "PlainTable"; FLAGS_rocksdb_wal_dir = rocksdbWalPath.path(); FLAGS_rocksdb_backup_dir = backupPath.path(); + FLAGS_enable_rocksdb_prefix_filtering = true; auto waitLeader = [](const std::unique_ptr& store) { while (true) { diff --git a/src/kvstore/test/RateLimiterTest.cpp b/src/kvstore/test/RateLimiterTest.cpp index f01fc70d071..f263c72d803 100644 --- a/src/kvstore/test/RateLimiterTest.cpp +++ b/src/kvstore/test/RateLimiterTest.cpp @@ -17,32 +17,38 @@ namespace nebula { namespace kvstore { TEST(RateLimter, ConsumeLessEqualThanBurst) { - RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit); + RateLimiter limiter; auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 50) { - limiter.consume(FLAGS_snapshot_part_rate_limit / 10); + limiter.consume(FLAGS_snapshot_part_rate_limit / 10, // toConsume + FLAGS_snapshot_part_rate_limit, // rate + FLAGS_snapshot_part_rate_limit); // burstSize } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); } TEST(RateLimter, ConsumeGreaterThanBurst) { - RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit / 10); + RateLimiter limiter; auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 5) { // greater than burst size, will sleep 1 second instead - limiter.consume(FLAGS_snapshot_part_rate_limit); + limiter.consume(FLAGS_snapshot_part_rate_limit, // toConsume + FLAGS_snapshot_part_rate_limit, // rate + FLAGS_snapshot_part_rate_limit / 10); // burstSize } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); } TEST(RateLimter, RateLessThanBurst) { - RateLimiter limiter(FLAGS_snapshot_part_rate_limit, 2 * FLAGS_snapshot_part_rate_limit); + RateLimiter limiter; auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 5) { - limiter.consume(FLAGS_snapshot_part_rate_limit); + limiter.consume(FLAGS_snapshot_part_rate_limit, // toConsume + FLAGS_snapshot_part_rate_limit, // rate + 2 * FLAGS_snapshot_part_rate_limit); // burstSize } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); } diff --git a/src/kvstore/test/RocksEngineTest.cpp b/src/kvstore/test/RocksEngineTest.cpp index a800463e4c7..66eddedff2c 100644 --- a/src/kvstore/test/RocksEngineTest.cpp +++ b/src/kvstore/test/RocksEngineTest.cpp @@ -20,7 +20,19 @@ namespace kvstore { const int32_t kDefaultVIdLen = 8; -TEST(RocksEngineTest, SimpleTest) { +class RocksEngineTest : public ::testing::TestWithParam> { + public: + void SetUp() override { + auto param = GetParam(); + FLAGS_enable_rocksdb_prefix_filtering = std::get<0>(param); + FLAGS_enable_rocksdb_whole_key_filtering = std::get<1>(param); + FLAGS_rocksdb_table_format = std::get<2>(param); + } + + void TearDown() override {} +}; + +TEST_P(RocksEngineTest, SimpleTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_SimpleTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->put("key", "val")); @@ -29,7 +41,7 @@ TEST(RocksEngineTest, SimpleTest) { EXPECT_EQ("val", val); } -TEST(RocksEngineTest, RangeTest) { +TEST_P(RocksEngineTest, RangeTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_RangeTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); std::vector data; @@ -66,7 +78,7 @@ TEST(RocksEngineTest, RangeTest) { checkRange(1, 15, 10, 5); } -TEST(RocksEngineTest, PrefixTest) { +TEST_P(RocksEngineTest, PrefixTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_PrefixTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); LOG(INFO) << "Write data in batch and scan them..."; @@ -105,7 +117,7 @@ TEST(RocksEngineTest, PrefixTest) { checkPrefix("c", 20, 20); } -TEST(RocksEngineTest, RemoveTest) { +TEST_P(RocksEngineTest, RemoveTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_RemoveTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->put("key", "val")); @@ -116,7 +128,10 @@ TEST(RocksEngineTest, RemoveTest) { EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, engine->get("key", &val)); } -TEST(RocksEngineTest, RemoveRangeTest) { +TEST_P(RocksEngineTest, RemoveRangeTest) { + if (FLAGS_rocksdb_table_format == "PlainTable") { + return; + } fs::TempDir rootPath("/tmp/rocksdb_engine_RemoveRangeTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); for (int32_t i = 0; i < 100; i++) { @@ -154,7 +169,7 @@ TEST(RocksEngineTest, RemoveRangeTest) { } } -TEST(RocksEngineTest, OptionTest) { +TEST_P(RocksEngineTest, OptionTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_OptionTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, @@ -173,7 +188,7 @@ TEST(RocksEngineTest, OptionTest) { engine->setDBOption("max_background_compactions", "bad_value")); } -TEST(RocksEngineTest, CompactTest) { +TEST_P(RocksEngineTest, CompactTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_CompactTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); std::vector data; @@ -184,7 +199,10 @@ TEST(RocksEngineTest, CompactTest) { EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->compact()); } -TEST(RocksEngineTest, IngestTest) { +TEST_P(RocksEngineTest, IngestTest) { + if (FLAGS_rocksdb_table_format == "PlainTable") { + return; + } rocksdb::Options options; rocksdb::SstFileWriter writer(rocksdb::EnvOptions(), options); fs::TempDir rootPath("/tmp/rocksdb_engine_IngestTest.XXXXXX"); @@ -210,7 +228,10 @@ TEST(RocksEngineTest, IngestTest) { EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, engine->get("key_not_exist", &result)); } -TEST(RocksEngineTest, BackupRestoreTable) { +TEST_P(RocksEngineTest, BackupRestoreTable) { + if (FLAGS_rocksdb_table_format == "PlainTable") { + return; + } rocksdb::Options options; rocksdb::SstFileWriter writer(rocksdb::EnvOptions(), options); fs::TempDir rootPath("/tmp/rocksdb_engine_backuptable.XXXXXX"); @@ -271,90 +292,16 @@ TEST(RocksEngineTest, BackupRestoreTable) { EXPECT_EQ(num, 5); } -TEST(RocksEngineTest, BackupRestoreWithoutData) { - fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX"); - fs::TempDir rocksdbWalPath("/tmp/rocks_engine_test_rocksdb_wal_path.XXXXXX"); - fs::TempDir backupPath("/tmp/rocks_engine_test_backup_path.XXXXXX"); - FLAGS_rocksdb_table_format = "PlainTable"; - FLAGS_rocksdb_wal_dir = rocksdbWalPath.path(); - FLAGS_rocksdb_backup_dir = backupPath.path(); - - auto engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); - - LOG(INFO) << "Stop the engine and remove data"; - // release the engine and mock machine reboot by removing the data - engine.reset(); - CHECK(fs::FileUtils::remove(dataPath.path(), true)); - - LOG(INFO) << "Start recover"; - // reopen the engine, and it will try to restore from the previous backup - engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); - - FLAGS_rocksdb_table_format = "BlockBasedTable"; - FLAGS_rocksdb_wal_dir = ""; - FLAGS_rocksdb_backup_dir = ""; -} - -TEST(RocksEngineTest, BackupRestoreWithData) { - fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX"); - fs::TempDir rocksdbWalPath("/tmp/rocks_engine_test_rocksdb_wal_path.XXXXXX"); - fs::TempDir backupPath("/tmp/rocks_engine_test_backup_path.XXXXXX"); - FLAGS_rocksdb_table_format = "PlainTable"; - FLAGS_rocksdb_wal_dir = rocksdbWalPath.path(); - FLAGS_rocksdb_backup_dir = backupPath.path(); - - auto engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); - PartitionID partId = 1; - - auto checkData = [&] { - std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, "vertex"); - std::unique_ptr iter; - auto code = engine->prefix(prefix, &iter); - EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - int32_t num = 0; - while (iter->valid()) { - num++; - iter->next(); - } - EXPECT_EQ(num, 10); - - std::string value; - code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); - EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - EXPECT_EQ("123", value); - }; - - LOG(INFO) << "Write some data"; - std::vector data; - for (auto tagId = 0; tagId < 10; tagId++) { - data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "vertex", tagId), - folly::stringPrintf("val_%d", tagId)); +TEST_P(RocksEngineTest, VertexWholeKeyBloomFilterTest) { + if (FLAGS_rocksdb_table_format == "PlainTable") { + return; } - data.emplace_back(NebulaKeyUtils::systemCommitKey(partId), "123"); - engine->multiPut(std::move(data)); - - checkData(); - LOG(INFO) << "Stop the engine and remove data"; - // release the engine and mock machine reboot by removing the data - engine.reset(); - CHECK(fs::FileUtils::remove(dataPath.path(), true)); - - LOG(INFO) << "Start recover"; - // reopen the engine, and it will try to restore from the previous backup - engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); - checkData(); - - FLAGS_rocksdb_table_format = "BlockBasedTable"; - FLAGS_rocksdb_wal_dir = ""; - FLAGS_rocksdb_backup_dir = ""; -} - -TEST(RocksEngineTest, VertexBloomFilterTest) { FLAGS_enable_rocksdb_statistics = true; fs::TempDir rootPath("/tmp/rocksdb_engine_VertexBloomFilterTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); PartitionID partId = 1; VertexID vId = "vertex"; + VertexID notExisted = "notexist"; auto writeVertex = [&](TagID tagId) { std::vector data; @@ -374,34 +321,65 @@ TEST(RocksEngineTest, VertexBloomFilterTest) { } }; + auto scanVertex = [&](VertexID id) { + auto prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, id); + std::unique_ptr iter; + auto ret = engine->prefix(prefix, &iter); + EXPECT_EQ(ret, nebula::cpp2::ErrorCode::SUCCEEDED); + }; + auto statistics = kvstore::getDBStatistics(); + statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL); + statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL); // write initial vertex writeVertex(0); // read data while in memtable - readVertex(0); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); - readVertex(1); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + if (FLAGS_enable_rocksdb_whole_key_filtering) { + readVertex(0); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + readVertex(1); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + } + if (FLAGS_enable_rocksdb_prefix_filtering) { + scanVertex(vId); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + scanVertex(notExisted); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + } // flush to sst, read again EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); - readVertex(0); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); - // read not exists data, whole key bloom filter will be useful - readVertex(1); - EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + if (FLAGS_enable_rocksdb_whole_key_filtering) { + readVertex(0); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + // read not exists data, whole key bloom filter will be useful + readVertex(1); + EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + } + if (FLAGS_enable_rocksdb_prefix_filtering) { + scanVertex(vId); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + // read not exists data, prefix key bloom filter will be useful + scanVertex(notExisted); + EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + } FLAGS_enable_rocksdb_statistics = false; } -TEST(RocksEngineTest, EdgeBloomFilterTest) { +TEST_P(RocksEngineTest, EdgeWholeKeyBloomFilterTest) { + if (FLAGS_rocksdb_table_format == "PlainTable") { + return; + } FLAGS_enable_rocksdb_statistics = true; fs::TempDir rootPath("/tmp/rocksdb_engine_EdgeBloomFilterTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); PartitionID partId = 1; VertexID vId = "vertex"; + VertexID notExisted = "notexist"; + auto writeEdge = [&](EdgeType edgeType) { std::vector data; data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, partId, vId, edgeType, 0, vId), @@ -420,72 +398,188 @@ TEST(RocksEngineTest, EdgeBloomFilterTest) { } }; + auto scanEdge = [&](VertexID id) { + auto prefix = NebulaKeyUtils::edgePrefix(kDefaultVIdLen, partId, id); + std::unique_ptr iter; + auto ret = engine->prefix(prefix, &iter); + EXPECT_EQ(ret, nebula::cpp2::ErrorCode::SUCCEEDED); + }; + auto statistics = kvstore::getDBStatistics(); statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL); + statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL); // write initial vertex writeEdge(0); // read data while in memtable - readEdge(0); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); - readEdge(1); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + if (FLAGS_enable_rocksdb_whole_key_filtering) { + readEdge(0); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + readEdge(1); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + } + if (FLAGS_enable_rocksdb_prefix_filtering) { + scanEdge(vId); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + scanEdge(notExisted); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + } // flush to sst, read again EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush()); - readEdge(0); - EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); - // read not exists data, whole key bloom filter will be useful - readEdge(1); - EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + if (FLAGS_enable_rocksdb_whole_key_filtering) { + readEdge(0); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + // read not exists data, whole key bloom filter will be useful + readEdge(1); + EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0); + } + if (FLAGS_enable_rocksdb_prefix_filtering) { + scanEdge(vId); + EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + // read not exists data, prefix key bloom filter will be useful + scanEdge(notExisted); + EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0); + } FLAGS_enable_rocksdb_statistics = false; } -class RocksEnginePrefixTest - : public ::testing::TestWithParam> { - public: - void SetUp() override { - auto param = GetParam(); - FLAGS_enable_rocksdb_prefix_filtering = std::get<0>(param); - FLAGS_rocksdb_table_format = std::get<1>(param); - if (FLAGS_rocksdb_table_format == "PlainTable") { - FLAGS_rocksdb_plain_table_prefix_length = std::get<2>(param); - } - } - - void TearDown() override {} -}; - -TEST_P(RocksEnginePrefixTest, PrefixTest) { +TEST_P(RocksEngineTest, PrefixBloomTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_PrefixExtractorTest.XXXXXX"); auto engine = std::make_unique(0, kDefaultVIdLen, rootPath.path()); - PartitionID partId = 1; - std::vector data; for (auto tagId = 0; tagId < 10; tagId++) { - data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "vertex", tagId), + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 1, "1", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 1, "2", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 2, "3", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 2, "4", tagId), folly::stringPrintf("val_%d", tagId)); } - data.emplace_back(NebulaKeyUtils::systemCommitKey(partId), "123"); + data.emplace_back(NebulaKeyUtils::systemCommitKey(1), "123"); + data.emplace_back(NebulaKeyUtils::systemCommitKey(2), "123"); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data))); { - std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, "vertex"); - std::unique_ptr iter; - auto code = engine->prefix(prefix, &iter); - EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - int32_t num = 0; - while (iter->valid()) { - num++; - iter->next(); - } - EXPECT_EQ(num, 10); + // vertexPrefix(partId) will not be included + auto checkVertexPrefix = [&](PartitionID partId, const VertexID& vId) { + std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, vId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 10); + }; + checkVertexPrefix(1, "1"); + checkVertexPrefix(1, "2"); + checkVertexPrefix(2, "3"); + checkVertexPrefix(2, "4"); + } + { + // vertexPrefix(partId) will be included + auto checkPartPrefix = [&](PartitionID partId) { + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 20); + }; + checkPartPrefix(1); + checkPartPrefix(2); + } + { + // vertexPrefix(partId) will be included + auto checkRangeWithPartPrefix = [&](PartitionID partId) { + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + std::unique_ptr iter; + auto code = engine->rangeWithPrefix(prefix, prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 20); + }; + checkRangeWithPartPrefix(1); + checkRangeWithPartPrefix(2); } { - std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + auto checkSystemCommit = [&](PartitionID partId) { + std::string value; + auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + EXPECT_EQ("123", value); + }; + checkSystemCommit(1); + checkSystemCommit(2); + } +} + +INSTANTIATE_TEST_CASE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat, + RocksEngineTest, + ::testing::Values(std::make_tuple(false, false, "BlockBasedTable"), + std::make_tuple(false, true, "BlockBasedTable"), + std::make_tuple(true, false, "BlockBasedTable"), + std::make_tuple(true, true, "BlockBasedTable"), + // PlainTable will always enable prefix extractor + std::make_tuple(true, false, "PlainTable"), + std::make_tuple(true, true, "PlainTable"))); + +TEST(PlainTableTest, BackupRestoreWithoutData) { + fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX"); + fs::TempDir rocksdbWalPath("/tmp/rocks_engine_test_rocksdb_wal_path.XXXXXX"); + fs::TempDir backupPath("/tmp/rocks_engine_test_backup_path.XXXXXX"); + FLAGS_rocksdb_table_format = "PlainTable"; + FLAGS_rocksdb_wal_dir = rocksdbWalPath.path(); + FLAGS_rocksdb_backup_dir = backupPath.path(); + FLAGS_enable_rocksdb_prefix_filtering = true; + + auto engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); + + LOG(INFO) << "Stop the engine and remove data"; + // release the engine and mock machine reboot by removing the data + engine.reset(); + CHECK(fs::FileUtils::remove(dataPath.path(), true)); + + LOG(INFO) << "Start recover"; + // reopen the engine, and it will try to restore from the previous backup + engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); + + FLAGS_rocksdb_table_format = "BlockBasedTable"; + FLAGS_rocksdb_wal_dir = ""; + FLAGS_rocksdb_backup_dir = ""; + FLAGS_enable_rocksdb_prefix_filtering = false; +} + +TEST(PlainTableTest, BackupRestoreWithData) { + fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX"); + fs::TempDir rocksdbWalPath("/tmp/rocks_engine_test_rocksdb_wal_path.XXXXXX"); + fs::TempDir backupPath("/tmp/rocks_engine_test_backup_path.XXXXXX"); + FLAGS_rocksdb_table_format = "PlainTable"; + FLAGS_rocksdb_wal_dir = rocksdbWalPath.path(); + FLAGS_rocksdb_backup_dir = backupPath.path(); + FLAGS_enable_rocksdb_prefix_filtering = true; + + auto engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); + PartitionID partId = 1; + + auto checkData = [&] { + std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, "vertex"); std::unique_ptr iter; auto code = engine->prefix(prefix, &iter); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); @@ -495,23 +589,270 @@ TEST_P(RocksEnginePrefixTest, PrefixTest) { iter->next(); } EXPECT_EQ(num, 10); - } - { + std::string value; - auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); + code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); EXPECT_EQ("123", value); + }; + + LOG(INFO) << "Write some data"; + std::vector data; + for (auto tagId = 0; tagId < 10; tagId++) { + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, "vertex", tagId), + folly::stringPrintf("val_%d", tagId)); } + data.emplace_back(NebulaKeyUtils::systemCommitKey(partId), "123"); + engine->multiPut(std::move(data)); + + checkData(); + LOG(INFO) << "Stop the engine and remove data"; + // release the engine and mock machine reboot by removing the data + engine.reset(); + CHECK(fs::FileUtils::remove(dataPath.path(), true)); + + LOG(INFO) << "Start recover"; + // reopen the engine, and it will try to restore from the previous backup + engine = std::make_unique(0, kDefaultVIdLen, dataPath.path()); + checkData(); + + FLAGS_rocksdb_table_format = "BlockBasedTable"; + FLAGS_rocksdb_wal_dir = ""; + FLAGS_rocksdb_backup_dir = ""; + FLAGS_enable_rocksdb_prefix_filtering = false; } -INSTANTIATE_TEST_CASE_P( - PrefixExtractor_TableFormat_PlainTablePrefixSize, - RocksEnginePrefixTest, - ::testing::Values(std::make_tuple(false, "BlockBasedTable", 0), - std::make_tuple(true, "BlockBasedTable", 0), - // PlainTable will always enable prefix extractor - std::make_tuple(true, "PlainTable", sizeof(PartitionID)), - std::make_tuple(true, "PlainTable", sizeof(PartitionID) + kDefaultVIdLen))); +TEST(RebuildPrefixBloomFilter, RebuildPrefixBloomFilter) { + GraphSpaceID spaceId = 1; + // previously default config (prefix off whole on) + FLAGS_rocksdb_table_format = "BlockBasedTable"; + FLAGS_enable_rocksdb_prefix_filtering = false; + FLAGS_enable_rocksdb_whole_key_filtering = true; + + fs::TempDir dataPath("/tmp/rocksdb_engine_rebuild_prefix_bloom_filter.XXXXXX"); + LOG(INFO) << "start the engine with prefix bloom filter disabled"; + auto engine = std::make_unique(spaceId, kDefaultVIdLen, dataPath.path()); + + auto checkData = [&] { + auto checkVertexPrefix = [&](PartitionID partId, VertexID vId) { + { + std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, vId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 10); + } + for (TagID tagId = 0; tagId < 10; tagId++) { + std::string prefix = NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, partId, vId, tagId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 1); + } + }; + + auto checkEdgePrefix = [&](PartitionID partId, VertexID vId) { + { + std::string prefix = NebulaKeyUtils::edgePrefix(kDefaultVIdLen, partId, vId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 10); + } + for (EdgeType edgeType = 0; edgeType < 10; edgeType++) { + std::string prefix = NebulaKeyUtils::edgePrefix(kDefaultVIdLen, partId, vId, edgeType); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 1); + } + }; + + auto checkVertexPartPrefix = [&](PartitionID partId) { + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 20); + }; + + auto checkEdgePartPrefix = [&](PartitionID partId) { + std::string prefix = NebulaKeyUtils::edgePrefix(partId); + std::unique_ptr iter; + auto code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 20); + }; + + auto checkRangeWithPartPrefix = [&](PartitionID partId) { + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + std::unique_ptr iter; + auto code = engine->rangeWithPrefix(prefix, prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 20); + }; + + auto checkSystemCommit = [&](PartitionID partId) { + std::string value; + auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + EXPECT_EQ("123", value); + }; + + checkVertexPrefix(1, "1"); + checkVertexPrefix(1, "2"); + checkVertexPrefix(2, "3"); + checkVertexPrefix(2, "4"); + checkEdgePrefix(1, "1"); + checkEdgePrefix(1, "2"); + checkEdgePrefix(2, "3"); + checkEdgePrefix(2, "4"); + checkVertexPartPrefix(1); + checkVertexPartPrefix(2); + checkEdgePartPrefix(1); + checkEdgePartPrefix(2); + checkRangeWithPartPrefix(1); + checkRangeWithPartPrefix(2); + checkSystemCommit(1); + checkSystemCommit(2); + }; + + auto writeData = [&engine] { + LOG(INFO) << "Write some data"; + std::vector data; + for (TagID tagId = 0; tagId < 10; tagId++) { + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 1, "1", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 1, "2", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 2, "3", tagId), + folly::stringPrintf("val_%d", tagId)); + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 2, "4", tagId), + folly::stringPrintf("val_%d", tagId)); + } + EdgeRanking rank = 0; + for (EdgeType edgeType = 0; edgeType < 10; edgeType++) { + data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 1, "1", edgeType, rank, "1"), + folly::stringPrintf("val_%d", edgeType)); + data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 1, "2", edgeType, rank, "2"), + folly::stringPrintf("val_%d", edgeType)); + data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 2, "3", edgeType, rank, "3"), + folly::stringPrintf("val_%d", edgeType)); + data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 2, "4", edgeType, rank, "4"), + folly::stringPrintf("val_%d", edgeType)); + } + data.emplace_back(NebulaKeyUtils::systemCommitKey(1), "123"); + data.emplace_back(NebulaKeyUtils::systemCommitKey(2), "123"); + auto code = engine->multiPut(std::move(data)); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + }; + + auto writeNewData = [&engine] { + std::vector data; + data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 3, "5", 0), + "vertex_data_after_enable_prefix_bloom_filter"); + data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 3, "5", 0, 0, "5"), + "edge_data_after_enable_prefix_bloom_filter"); + auto code = engine->multiPut(std::move(data)); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + }; + + auto checkNewData = [&engine] { + std::string value; + auto code = engine->get(NebulaKeyUtils::vertexKey(kDefaultVIdLen, 3, "5", 0), &value); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + EXPECT_EQ("vertex_data_after_enable_prefix_bloom_filter", value); + code = engine->get(NebulaKeyUtils::edgeKey(kDefaultVIdLen, 3, "5", 0, 0, "5"), &value); + EXPECT_EQ("edge_data_after_enable_prefix_bloom_filter", value); + + auto checkPrefix = [&](const std::string& prefix) { + std::unique_ptr iter; + code = engine->prefix(prefix, &iter); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + int32_t num = 0; + while (iter->valid()) { + num++; + iter->next(); + } + EXPECT_EQ(num, 1); + }; + + checkPrefix(NebulaKeyUtils::vertexPrefix(3)); + checkPrefix(NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, 3, "5")); + checkPrefix(NebulaKeyUtils::vertexPrefix(kDefaultVIdLen, 3, "5", 0)); + checkPrefix(NebulaKeyUtils::edgePrefix(3)); + checkPrefix(NebulaKeyUtils::edgePrefix(kDefaultVIdLen, 3, "5")); + checkPrefix(NebulaKeyUtils::edgePrefix(kDefaultVIdLen, 3, "5", 0)); + checkPrefix(NebulaKeyUtils::edgePrefix(kDefaultVIdLen, 3, "5", 0, 0, "5")); + }; + + writeData(); + checkData(); + + LOG(INFO) << "release the engine and restart with prefix bloom filter enabled"; + engine.reset(); + // new default config (prefix on whole off) + FLAGS_enable_rocksdb_prefix_filtering = true; + FLAGS_enable_rocksdb_whole_key_filtering = false; + engine = std::make_unique(spaceId, kDefaultVIdLen, dataPath.path()); + checkData(); + + writeNewData(); + checkNewData(); + + LOG(INFO) << "compact to rebuild prefix bloom filter"; + engine->compact(); + checkData(); + checkNewData(); + + LOG(INFO) << "release the engine and restart with prefix bloom filter disabled again"; + engine.reset(); + FLAGS_enable_rocksdb_prefix_filtering = false; + FLAGS_enable_rocksdb_whole_key_filtering = true; + engine = std::make_unique(spaceId, kDefaultVIdLen, dataPath.path()); + checkData(); + checkNewData(); + + LOG(INFO) << "compact to rebuild whole key bloom filter"; + engine->compact(); + checkData(); + checkNewData(); +} } // namespace kvstore } // namespace nebula diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 8589e5e6a83..78782fa3d40 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -62,6 +62,7 @@ nebula_add_library( http/StorageHttpDownloadHandler.cpp http/StorageHttpAdminHandler.cpp http/StorageHttpStatsHandler.cpp + http/StorageHttpPropertyHandler.cpp ) nebula_add_library( diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index ca5c15f9fa9..07356186de0 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -26,6 +26,7 @@ #include "storage/http/StorageHttpAdminHandler.h" #include "storage/http/StorageHttpDownloadHandler.h" #include "storage/http/StorageHttpIngestHandler.h" +#include "storage/http/StorageHttpPropertyHandler.h" #include "storage/http/StorageHttpStatsHandler.h" #include "storage/transaction/TransactionManager.h" #include "version/Version.h" @@ -105,6 +106,9 @@ bool StorageServer::initWebService() { router.get("/rocksdb_stats").handler([](web::PathParams&&) { return new storage::StorageHttpStatsHandler(); }); + router.get("/rocksdb_property").handler([this](web::PathParams&&) { + return new storage::StorageHttpPropertyHandler(schemaMan_.get(), kvstore_.get()); + }); auto status = webSvc_->start(); return status.ok(); diff --git a/src/storage/admin/RebuildIndexTask.cpp b/src/storage/admin/RebuildIndexTask.cpp index aee855a8cad..1f394c316d8 100644 --- a/src/storage/admin/RebuildIndexTask.cpp +++ b/src/storage/admin/RebuildIndexTask.cpp @@ -23,7 +23,7 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) // 1Mb (512 * 2 peers). Muliplied by the subtasks concurrency, the total send/recv traffic will be // 10Mb, which is non-trival. LOG(INFO) << "Rebuild index task is rate limited to " << FLAGS_rebuild_index_part_rate_limit - << " for each subtask"; + << " for each subtask by default"; } ErrorOr> RebuildIndexTask::genSubTasks() { @@ -78,8 +78,7 @@ ErrorOr> RebuildIndexTask::ge nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space, PartitionID part, const IndexItems& items) { - auto rateLimiter = std::make_unique(FLAGS_rebuild_index_part_rate_limit, - FLAGS_rebuild_index_part_rate_limit); + auto rateLimiter = std::make_unique(); // TaskMananger will make sure that there won't be cocurrent invoke of a given part auto result = removeLegacyLogs(space, part); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -221,7 +220,9 @@ nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space, kvstore::RateLimiter* rateLimiter) { folly::Baton baton; auto result = nebula::cpp2::ErrorCode::SUCCEEDED; - rateLimiter->consume(batchSize); + rateLimiter->consume(static_cast(batchSize), // toConsume + static_cast(FLAGS_rebuild_index_part_rate_limit), // rate + static_cast(FLAGS_rebuild_index_part_rate_limit)); // burstSize env_->kvstore_->asyncMultiPut( space, part, std::move(data), [&result, &baton](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -240,7 +241,9 @@ nebula::cpp2::ErrorCode RebuildIndexTask::writeOperation(GraphSpaceID space, folly::Baton baton; auto result = nebula::cpp2::ErrorCode::SUCCEEDED; auto encoded = encodeBatchValue(batchHolder->getBatch()); - rateLimiter->consume(batchHolder->size()); + rateLimiter->consume(static_cast(batchHolder->size()), // toConsume + static_cast(FLAGS_rebuild_index_part_rate_limit), // rate + static_cast(FLAGS_rebuild_index_part_rate_limit)); // burstSize env_->kvstore_->asyncAppendBatch( space, part, std::move(encoded), [&result, &baton](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/storage/http/StorageHttpPropertyHandler.cpp b/src/storage/http/StorageHttpPropertyHandler.cpp new file mode 100644 index 00000000000..838a0efa90b --- /dev/null +++ b/src/storage/http/StorageHttpPropertyHandler.cpp @@ -0,0 +1,105 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "storage/http/StorageHttpPropertyHandler.h" + +#include +#include +#include + +#include "common/base/Base.h" + +namespace nebula { +namespace storage { + +using proxygen::HTTPMessage; +using proxygen::HTTPMethod; +using proxygen::ProxygenError; +using proxygen::ResponseBuilder; +using proxygen::UpgradeProtocol; + +void StorageHttpPropertyHandler::onRequest(std::unique_ptr headers) noexcept { + if (headers->getMethod().value() != HTTPMethod::GET) { + // Unsupported method + resp_ = "Not supported"; + err_ = HttpCode::E_UNSUPPORTED_METHOD; + return; + } + + do { + if (headers->hasQueryParam("space")) { + auto spaceName = headers->getQueryParam("space"); + auto ret = schemaMan_->toGraphSpaceID(spaceName); + if (!ret.ok()) { + resp_ = "Space not found: " + spaceName; + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + break; + } + spaceId_ = ret.value(); + } else { + resp_ = + "Space should not be empty. " + "Usage: http:://ip:port/rocksdb_property?space=xxx&property=yyy"; + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + break; + } + + if (headers->hasQueryParam("property")) { + folly::split(",", headers->getQueryParam("property"), properties_, true); + } else { + resp_ = + "Property should not be empty. " + "Usage: http:://ip:port/rocksdb_property?space=xxx&property=yyy"; + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + break; + } + + auto result = folly::dynamic::array(); + for (const auto& property : properties_) { + auto ret = kv_->getProperty(spaceId_, property); + if (!ok(ret)) { + resp_ = "Property not found: " + property; + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + return; + } else { + result.push_back(folly::parseJson(value(ret))); + } + } + resp_ = folly::toPrettyJson(result); + } while (false); +} + +void StorageHttpPropertyHandler::onBody(std::unique_ptr) noexcept { + // Do nothing, we only support GET +} + +void StorageHttpPropertyHandler::onEOM() noexcept { + switch (err_) { + case HttpCode::E_UNSUPPORTED_METHOD: + ResponseBuilder(downstream_).status(405, "Method not allowed").body(resp_).sendWithEOM(); + return; + case HttpCode::E_ILLEGAL_ARGUMENT: + ResponseBuilder(downstream_).status(400, "Illegal argument").body(resp_).sendWithEOM(); + return; + default: + break; + } + + ResponseBuilder(downstream_).status(200, "OK").body(resp_).sendWithEOM(); +} + +void StorageHttpPropertyHandler::onUpgrade(UpgradeProtocol) noexcept { + // Do nothing +} + +void StorageHttpPropertyHandler::requestComplete() noexcept { delete this; } + +void StorageHttpPropertyHandler::onError(ProxygenError error) noexcept { + LOG(ERROR) << "Web service StorageHttpHandler got error: " << proxygen::getErrorString(error); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/http/StorageHttpPropertyHandler.h b/src/storage/http/StorageHttpPropertyHandler.h new file mode 100644 index 00000000000..2e112e82f69 --- /dev/null +++ b/src/storage/http/StorageHttpPropertyHandler.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#pragma once + +#include + +#include "common/base/Base.h" +#include "kvstore/KVStore.h" +#include "webservice/Common.h" + +namespace nebula { +namespace storage { + +class StorageHttpPropertyHandler : public proxygen::RequestHandler { + public: + StorageHttpPropertyHandler(meta::SchemaManager* schemaMan, kvstore::KVStore* kv) + : schemaMan_(schemaMan), kv_(kv) {} + + void onRequest(std::unique_ptr headers) noexcept override; + + void onBody(std::unique_ptr body) noexcept override; + + void onEOM() noexcept override; + + void onUpgrade(proxygen::UpgradeProtocol proto) noexcept override; + + void requestComplete() noexcept override; + + void onError(proxygen::ProxygenError err) noexcept override; + + private: + meta::SchemaManager* schemaMan_ = nullptr; + kvstore::KVStore* kv_ = nullptr; + HttpCode err_{HttpCode::SUCCEEDED}; + std::string resp_; + GraphSpaceID spaceId_; + std::vector properties_; +}; + +} // namespace storage +} // namespace nebula diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 8b63074315b..45dc4c0e662 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -512,6 +512,23 @@ nebula_add_test( gtest ) +nebula_add_test( + NAME + storage_http_property_test + SOURCES + StorageHttpPropertyHandlerTest.cpp + OBJECTS + $ + $ + ${storage_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) + nebula_add_test( NAME scan_edge_test diff --git a/src/storage/test/StorageHttpPropertyHandlerTest.cpp b/src/storage/test/StorageHttpPropertyHandlerTest.cpp new file mode 100644 index 00000000000..3285ab35174 --- /dev/null +++ b/src/storage/test/StorageHttpPropertyHandlerTest.cpp @@ -0,0 +1,117 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include +#include + +#include "common/base/Base.h" +#include "common/fs/TempDir.h" +#include "kvstore/RocksEngineConfig.h" +#include "mock/MockCluster.h" +#include "storage/http/StorageHttpPropertyHandler.h" +#include "storage/test/TestUtils.h" +#include "webservice/Router.h" +#include "webservice/WebService.h" +#include "webservice/test/TestUtils.h" + +namespace nebula { +namespace storage { + +class StorageHttpStatsHandlerTestEnv : public ::testing::Environment { + public: + void SetUp() override { + FLAGS_ws_ip = "127.0.0.1"; + FLAGS_ws_http_port = 0; + FLAGS_ws_h2_port = 0; + FLAGS_enable_rocksdb_statistics = true; + rootPath_ = std::make_unique("/tmp/StorageHttpPropertyHandler.XXXXXX"); + cluster_ = std::make_unique(); + cluster_->initStorageKV(rootPath_->path()); + + VLOG(1) << "Starting web service..."; + webSvc_ = std::make_unique(); + auto& router = webSvc_->router(); + router.get("/rocksdb_property").handler([this](nebula::web::PathParams&&) { + return new storage::StorageHttpPropertyHandler(cluster_->storageEnv_->schemaMan_, + cluster_->storageEnv_->kvstore_); + }); + auto status = webSvc_->start(); + ASSERT_TRUE(status.ok()) << status; + } + + void TearDown() override { + cluster_.reset(); + webSvc_.reset(); + rootPath_.reset(); + VLOG(1) << "Web service stopped"; + } + + protected: + std::unique_ptr cluster_; + std::unique_ptr webSvc_; + std::unique_ptr rootPath_; +}; + +static std::string request(const std::string& url) { + auto request = + folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), FLAGS_ws_http_port, url.c_str()); + auto resp = http::HttpClient::get(request); + EXPECT_TRUE(resp.ok()); + return resp.value(); +} + +static void checkInvalidRequest(const std::string& url, const std::string& errMsg) { + ASSERT_EQ(0, request(url).find(errMsg)); +} + +TEST(StorageHttpPropertyHandlerTest, InvalidRequest) { + checkInvalidRequest("/rocksdb_property", "Space should not be empty."); + checkInvalidRequest("/rocksdb_property?space=xxx", "Space not found: xxx"); + checkInvalidRequest("/rocksdb_property?space=1", "Property should not be empty."); + checkInvalidRequest("/rocksdb_property?space=1&property=yyy", "Property not found: yyy"); +} + +TEST(StorageHttpPropertyHandlerTest, ValidRequest) { + { + std::string expect = + R"([ + { + "Engine 0": "0", + "Engine 1": "0" + } +])"; + EXPECT_EQ(expect, request("/rocksdb_property?space=1&property=rocksdb.block-cache-usage")); + } + { + std::string expect = + R"([ + { + "Engine 0": "0", + "Engine 1": "0" + }, + { + "Engine 0": "0", + "Engine 1": "0" + } +])"; + EXPECT_EQ(expect, + request("/rocksdb_property?space=1&property=" + "rocksdb.block-cache-usage,rocksdb.is-write-stopped")); + } +} + +} // namespace storage +} // namespace nebula + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpStatsHandlerTestEnv()); + + return RUN_ALL_TESTS(); +}