Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Replace vertex lru cache with rocksdb row cache #532

Merged
merged 5 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ DEFINE_int32(rocksdb_batch_size,
DEFINE_int64(rocksdb_block_cache, 1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp");

DEFINE_bool(enable_partitioned_index_filter, false, "True for partitioned index filters");

DEFINE_string(rocksdb_compression, "snappy", "Compression algorithm used by RocksDB, "
Expand Down Expand Up @@ -260,11 +264,17 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
if (FLAGS_rocksdb_block_cache <= 0) {
bbtOpts.no_block_cache = true;
} else {
static std::shared_ptr<rocksdb::Cache> blockCache
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 8/*shard bits*/);
static std::shared_ptr<rocksdb::Cache> blockCache = rocksdb::NewLRUCache(
FLAGS_rocksdb_block_cache * 1024 * 1024, FLAGS_cache_bucket_exp);
bbtOpts.block_cache = blockCache;
}

if (FLAGS_row_cache_num) {
static std::shared_ptr<rocksdb::Cache> rowCache
= rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp);
baseOpts.row_cache = rowCache;
}

bbtOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
if (FLAGS_enable_partitioned_index_filter) {
bbtOpts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/RocksEngineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ DECLARE_int64(rocksdb_block_cache);

DECLARE_int32(rocksdb_batch_size);

DECLARE_int32(row_cache_num);

DECLARE_int32(cache_bucket_exp);

// rocksdb table format
DECLARE_string(rocksdb_table_format);

Expand Down
95 changes: 95 additions & 0 deletions src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,101 @@ TEST(RocksEngineTest, BackupRestoreWithData) {
FLAGS_rocksdb_backup_dir = "";
}

TEST(RocksEngineTest, VertexBloomFilterTest) {
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/rocksdb_engine_VertexBloomFilterTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
PartitionID partId = 1;
VertexID vId = "vertex";

auto writeVertex = [&](TagID tagId) {
std::vector<KV> data;
data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, vId, tagId),
folly::stringPrintf("val_%d", tagId));
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
};

auto readVertex = [&](TagID tagId) {
auto key = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, vId, tagId);
std::string val;
auto ret = engine->get(key, &val);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
EXPECT_EQ(folly::stringPrintf("val_%d", tagId), val);
} else {
EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, ret);
}
};

auto statistics = kvstore::getDBStatistics();

// write initial vertex
writeVertex(0);

// read data while in memtable
readVertex(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
readVertex(1);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

// flush to sst, read again
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
readVertex(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
// read not exists data, whole key bloom filter will be useful
readVertex(1);
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

FLAGS_enable_rocksdb_statistics = false;
}


TEST(RocksEngineTest, EdgeBloomFilterTest) {
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/rocksdb_engine_EdgeBloomFilterTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
PartitionID partId = 1;
VertexID vId = "vertex";
auto writeEdge = [&](EdgeType edgeType) {
std::vector<KV> data;
data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, partId, vId, edgeType, 0, vId),
folly::stringPrintf("val_%d", edgeType));
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
};

auto readEdge = [&](EdgeType edgeType) {
auto key = NebulaKeyUtils::edgeKey(kDefaultVIdLen, partId, vId, edgeType, 0, vId);
std::string val;
auto ret = engine->get(key, &val);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
EXPECT_EQ(folly::stringPrintf("val_%d", edgeType), val);
} else {
EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, ret);
}
};

auto statistics = kvstore::getDBStatistics();
statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL);

// write initial vertex
writeEdge(0);

// read data while in memtable
readEdge(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
readEdge(1);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

// flush to sst, read again
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
readEdge(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
// read not exists data, whole key bloom filter will be useful
readEdge(1);
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

FLAGS_enable_rocksdb_statistics = false;
}

class RocksEnginePrefixTest
: public ::testing::TestWithParam<std::tuple<bool, std::string, int32_t>> {
public:
Expand Down
19 changes: 7 additions & 12 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ namespace nebula {
namespace storage {

GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env)
: env_(env)
, vertexCache_(FLAGS_vertex_cache_num, FLAGS_vertex_cache_bucket_exp) {
: env_(env) {
if (FLAGS_reader_handlers_type == "io") {
auto tf = std::make_shared<folly::NamedThreadFactory>("reader-pool");
readerPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_reader_handlers,
Expand Down Expand Up @@ -64,14 +63,14 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env)
// Vertice section
folly::Future<cpp2::ExecResponse>
GraphStorageServiceHandler::future_addVertices(const cpp2::AddVerticesRequest& req) {
auto* processor = AddVerticesProcessor::instance(env_, &kAddVerticesCounters, &vertexCache_);
auto* processor = AddVerticesProcessor::instance(env_, &kAddVerticesCounters);
RETURN_FUTURE(processor);
}


folly::Future<cpp2::ExecResponse>
GraphStorageServiceHandler::future_deleteVertices(const cpp2::DeleteVerticesRequest& req) {
auto* processor = DeleteVerticesProcessor::instance(env_, &kDelVerticesCounters, &vertexCache_);
auto* processor = DeleteVerticesProcessor::instance(env_, &kDelVerticesCounters);
RETURN_FUTURE(processor);
}

Expand All @@ -80,8 +79,7 @@ folly::Future<cpp2::UpdateResponse>
GraphStorageServiceHandler::future_updateVertex(const cpp2::UpdateVertexRequest& req) {
auto* processor = UpdateVertexProcessor::instance(env_,
&kUpdateVertexCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand Down Expand Up @@ -112,8 +110,7 @@ folly::Future<cpp2::GetNeighborsResponse>
GraphStorageServiceHandler::future_getNeighbors(const cpp2::GetNeighborsRequest& req) {
auto* processor = GetNeighborsProcessor::instance(env_,
&kGetNeighborsCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand All @@ -122,8 +119,7 @@ folly::Future<cpp2::GetPropResponse>
GraphStorageServiceHandler::future_getProps(const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_,
&kGetPropCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand All @@ -132,8 +128,7 @@ folly::Future<cpp2::LookupIndexResp>
GraphStorageServiceHandler::future_lookupIndex(const cpp2::LookupIndexRequest& req) {
auto* processor = LookupProcessor::instance(env_,
&kLookupCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand Down
1 change: 0 additions & 1 deletion src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {

private:
StorageEnv* env_{nullptr};
VertexCache vertexCache_;
std::shared_ptr<folly::Executor> readerPool_;
};

Expand Down
4 changes: 0 additions & 4 deletions src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ DEFINE_int32(rebuild_index_batch_num, 1024,
DEFINE_int32(rebuild_index_locked_threshold, 1024,
"The locked threshold will refuse writing.");

DEFINE_int32(vertex_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(vertex_cache_bucket_exp, 4, "Total buckets number is 1 << cache_bucket_exp");

DEFINE_bool(enable_vertex_cache, true, "Enable vertex cache");

DEFINE_int32(reader_handlers, 32, "Total reader handlers");
Expand Down
4 changes: 0 additions & 4 deletions src/storage/StorageFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ DECLARE_int32(rebuild_index_batch_num);

DECLARE_int32(rebuild_index_locked_threshold);

DECLARE_int32(vertex_cache_num);

DECLARE_int32(vertex_cache_bucket_exp);

DECLARE_bool(enable_vertex_cache);

DECLARE_int32(reader_handlers);
Expand Down
Loading