Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Replace vertex lru cache with rocksdb row cache (#532)
Browse files Browse the repository at this point in the history
* remove vertex cache

* use get instead of prefix

* add test
  • Loading branch information
critical27 authored Jul 28, 2021
1 parent 98a4645 commit 0b54cac
Show file tree
Hide file tree
Showing 38 changed files with 328 additions and 952 deletions.
14 changes: 12 additions & 2 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ DEFINE_int32(rocksdb_batch_size,
DEFINE_int64(rocksdb_block_cache, 1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp");

DEFINE_bool(enable_partitioned_index_filter, false, "True for partitioned index filters");

DEFINE_string(rocksdb_compression, "snappy", "Compression algorithm used by RocksDB, "
Expand Down Expand Up @@ -260,11 +264,17 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
if (FLAGS_rocksdb_block_cache <= 0) {
bbtOpts.no_block_cache = true;
} else {
static std::shared_ptr<rocksdb::Cache> blockCache
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 8/*shard bits*/);
static std::shared_ptr<rocksdb::Cache> blockCache = rocksdb::NewLRUCache(
FLAGS_rocksdb_block_cache * 1024 * 1024, FLAGS_cache_bucket_exp);
bbtOpts.block_cache = blockCache;
}

if (FLAGS_row_cache_num) {
static std::shared_ptr<rocksdb::Cache> rowCache
= rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp);
baseOpts.row_cache = rowCache;
}

bbtOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
if (FLAGS_enable_partitioned_index_filter) {
bbtOpts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/RocksEngineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ DECLARE_int64(rocksdb_block_cache);

DECLARE_int32(rocksdb_batch_size);

DECLARE_int32(row_cache_num);

DECLARE_int32(cache_bucket_exp);

// rocksdb table format
DECLARE_string(rocksdb_table_format);

Expand Down
95 changes: 95 additions & 0 deletions src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,101 @@ TEST(RocksEngineTest, BackupRestoreWithData) {
FLAGS_rocksdb_backup_dir = "";
}

TEST(RocksEngineTest, VertexBloomFilterTest) {
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/rocksdb_engine_VertexBloomFilterTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
PartitionID partId = 1;
VertexID vId = "vertex";

auto writeVertex = [&](TagID tagId) {
std::vector<KV> data;
data.emplace_back(NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, vId, tagId),
folly::stringPrintf("val_%d", tagId));
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
};

auto readVertex = [&](TagID tagId) {
auto key = NebulaKeyUtils::vertexKey(kDefaultVIdLen, partId, vId, tagId);
std::string val;
auto ret = engine->get(key, &val);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
EXPECT_EQ(folly::stringPrintf("val_%d", tagId), val);
} else {
EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, ret);
}
};

auto statistics = kvstore::getDBStatistics();

// write initial vertex
writeVertex(0);

// read data while in memtable
readVertex(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
readVertex(1);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

// flush to sst, read again
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
readVertex(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
// read not exists data, whole key bloom filter will be useful
readVertex(1);
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

FLAGS_enable_rocksdb_statistics = false;
}


TEST(RocksEngineTest, EdgeBloomFilterTest) {
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/rocksdb_engine_EdgeBloomFilterTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
PartitionID partId = 1;
VertexID vId = "vertex";
auto writeEdge = [&](EdgeType edgeType) {
std::vector<KV> data;
data.emplace_back(NebulaKeyUtils::edgeKey(kDefaultVIdLen, partId, vId, edgeType, 0, vId),
folly::stringPrintf("val_%d", edgeType));
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
};

auto readEdge = [&](EdgeType edgeType) {
auto key = NebulaKeyUtils::edgeKey(kDefaultVIdLen, partId, vId, edgeType, 0, vId);
std::string val;
auto ret = engine->get(key, &val);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
EXPECT_EQ(folly::stringPrintf("val_%d", edgeType), val);
} else {
EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, ret);
}
};

auto statistics = kvstore::getDBStatistics();
statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL);

// write initial vertex
writeEdge(0);

// read data while in memtable
readEdge(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
readEdge(1);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

// flush to sst, read again
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
readEdge(0);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
// read not exists data, whole key bloom filter will be useful
readEdge(1);
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

FLAGS_enable_rocksdb_statistics = false;
}

class RocksEnginePrefixTest
: public ::testing::TestWithParam<std::tuple<bool, std::string, int32_t>> {
public:
Expand Down
19 changes: 7 additions & 12 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ namespace nebula {
namespace storage {

GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env)
: env_(env)
, vertexCache_(FLAGS_vertex_cache_num, FLAGS_vertex_cache_bucket_exp) {
: env_(env) {
if (FLAGS_reader_handlers_type == "io") {
auto tf = std::make_shared<folly::NamedThreadFactory>("reader-pool");
readerPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_reader_handlers,
Expand Down Expand Up @@ -64,14 +63,14 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env)
// Vertice section
folly::Future<cpp2::ExecResponse>
GraphStorageServiceHandler::future_addVertices(const cpp2::AddVerticesRequest& req) {
auto* processor = AddVerticesProcessor::instance(env_, &kAddVerticesCounters, &vertexCache_);
auto* processor = AddVerticesProcessor::instance(env_, &kAddVerticesCounters);
RETURN_FUTURE(processor);
}


folly::Future<cpp2::ExecResponse>
GraphStorageServiceHandler::future_deleteVertices(const cpp2::DeleteVerticesRequest& req) {
auto* processor = DeleteVerticesProcessor::instance(env_, &kDelVerticesCounters, &vertexCache_);
auto* processor = DeleteVerticesProcessor::instance(env_, &kDelVerticesCounters);
RETURN_FUTURE(processor);
}

Expand All @@ -80,8 +79,7 @@ folly::Future<cpp2::UpdateResponse>
GraphStorageServiceHandler::future_updateVertex(const cpp2::UpdateVertexRequest& req) {
auto* processor = UpdateVertexProcessor::instance(env_,
&kUpdateVertexCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand Down Expand Up @@ -112,8 +110,7 @@ folly::Future<cpp2::GetNeighborsResponse>
GraphStorageServiceHandler::future_getNeighbors(const cpp2::GetNeighborsRequest& req) {
auto* processor = GetNeighborsProcessor::instance(env_,
&kGetNeighborsCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand All @@ -122,8 +119,7 @@ folly::Future<cpp2::GetPropResponse>
GraphStorageServiceHandler::future_getProps(const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_,
&kGetPropCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand All @@ -132,8 +128,7 @@ folly::Future<cpp2::LookupIndexResp>
GraphStorageServiceHandler::future_lookupIndex(const cpp2::LookupIndexRequest& req) {
auto* processor = LookupProcessor::instance(env_,
&kLookupCounters,
readerPool_.get(),
&vertexCache_);
readerPool_.get());
RETURN_FUTURE(processor);
}

Expand Down
1 change: 0 additions & 1 deletion src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {

private:
StorageEnv* env_{nullptr};
VertexCache vertexCache_;
std::shared_ptr<folly::Executor> readerPool_;
};

Expand Down
4 changes: 0 additions & 4 deletions src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ DEFINE_int32(rebuild_index_batch_num, 1024,
DEFINE_int32(rebuild_index_locked_threshold, 1024,
"The locked threshold will refuse writing.");

DEFINE_int32(vertex_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(vertex_cache_bucket_exp, 4, "Total buckets number is 1 << cache_bucket_exp");

DEFINE_bool(enable_vertex_cache, true, "Enable vertex cache");

DEFINE_int32(reader_handlers, 32, "Total reader handlers");
Expand Down
4 changes: 0 additions & 4 deletions src/storage/StorageFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ DECLARE_int32(rebuild_index_batch_num);

DECLARE_int32(rebuild_index_locked_threshold);

DECLARE_int32(vertex_cache_num);

DECLARE_int32(vertex_cache_bucket_exp);

DECLARE_bool(enable_vertex_cache);

DECLARE_int32(reader_handlers);
Expand Down
Loading

0 comments on commit 0b54cac

Please sign in to comment.