Skip to content

Commit

Permalink
Add key value separation feature to Nebula storage engine (#3281)
Browse files Browse the repository at this point in the history
* Squash commit: enable kv separation for nebula

change comment

update comment

lint

* format

* lint

Co-authored-by: panda-sheep <[email protected]>
Co-authored-by: Yee <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2021
1 parent bf0c3a1 commit ae83b89
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 13 deletions.
10 changes: 10 additions & 0 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## Key-Value separation ##############
# Whether or not to enable BlobDB (RocksDB key-value separation support)
--rocksdb_enable_kv_separation=false
# RocksDB key value separation threshold. Values at or above this threshold will be written to blob files during flush or compaction.
--rocksdb_kv_separation_threshold=0
# Compression algorithm for blobs, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd
--rocksdb_blob_compression=lz4
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
--rocksdb_db_options={}
Expand Down
10 changes: 10 additions & 0 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## Key-Value separation ##############
# Whether or not to enable BlobDB (RocksDB key-value separation support)
--rocksdb_enable_kv_separation=false
# RocksDB key value separation threshold. Values at or above this threshold will be written to blob files during flush or compaction.
--rocksdb_kv_separation_threshold=0
# Compression algorithm for blobs, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd
--rocksdb_blob_compression=lz4
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############### misc ####################
--snapshot_part_rate_limit=10485760
--snapshot_batch_size=1048576
Expand Down
70 changes: 57 additions & 13 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,43 @@ DEFINE_int32(rocksdb_backup_interval_secs,
300,
"Rocksdb backup directory, only used in PlainTable format");

DEFINE_bool(rocksdb_enable_kv_separation,
false,
"Whether or not to enable BlobDB (RocksDB key-value separation support)");

DEFINE_uint64(rocksdb_kv_separation_threshold,
0,
"RocksDB key value separation threshold. Values at or above this threshold will be "
"written to blob files during flush or compaction."
"This value is only effective when enable_kv_separation is true.");

DEFINE_string(rocksdb_blob_compression,
"snappy",
"Compression algorithm for blobs, "
"options: no,snappy,lz4,lz4hc,zstd,zlib,bzip2");

DEFINE_bool(rocksdb_enable_blob_garbage_collection,
true,
"Set this to true to make BlobDB actively relocate valid blobs "
"from the oldest blob files as they are encountered during compaction");

namespace nebula {
namespace kvstore {

static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
static std::unordered_map<std::string, rocksdb::CompressionType> m = {
{"no", rocksdb::kNoCompression},
{"snappy", rocksdb::kSnappyCompression},
{"lz4", rocksdb::kLZ4Compression},
{"lz4hc", rocksdb::kLZ4HCCompression},
{"zstd", rocksdb::kZSTD},
{"zlib", rocksdb::kZlibCompression},
{"bzip2", rocksdb::kBZip2Compression}};
static const std::unordered_map<std::string, rocksdb::CompressionType> kCompressionTypeMap = {
{"no", rocksdb::kNoCompression},
{"snappy", rocksdb::kSnappyCompression},
{"lz4", rocksdb::kLZ4Compression},
{"lz4hc", rocksdb::kLZ4HCCompression},
{"zstd", rocksdb::kZSTD},
{"zlib", rocksdb::kZlibCompression},
{"bzip2", rocksdb::kBZip2Compression}};

static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
// Set the general compression algorithm
{
auto it = m.find(FLAGS_rocksdb_compression);
if (it == m.end()) {
auto it = kCompressionTypeMap.find(FLAGS_rocksdb_compression);
if (it == kCompressionTypeMap.end()) {
LOG(ERROR) << "Unsupported compression type: " << FLAGS_rocksdb_compression;
return rocksdb::Status::InvalidArgument();
}
Expand All @@ -149,8 +169,8 @@ static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
if (compressions[i].empty()) {
compressions[i] = FLAGS_rocksdb_compression;
}
auto it = m.find(compressions[i]);
if (it == m.end()) {
auto it = kCompressionTypeMap.find(compressions[i]);
if (it == kCompressionTypeMap.end()) {
LOG(ERROR) << "Unsupported compression type: " << compressions[i];
return rocksdb::Status::InvalidArgument();
}
Expand All @@ -161,6 +181,25 @@ static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
return rocksdb::Status::OK();
}

static rocksdb::Status initRocksdbKVSeparation(rocksdb::Options& baseOpts) {
if (FLAGS_rocksdb_enable_kv_separation) {
baseOpts.enable_blob_files = true;
baseOpts.min_blob_size = FLAGS_rocksdb_kv_separation_threshold;

// set blob compresstion algorithm
auto it = kCompressionTypeMap.find(FLAGS_rocksdb_blob_compression);
if (it == kCompressionTypeMap.end()) {
LOG(ERROR) << "Unsupported compression type: " << FLAGS_rocksdb_blob_compression;
return rocksdb::Status::InvalidArgument();
}
baseOpts.blob_compression_type = it->second;

// set blob gc
baseOpts.enable_blob_garbage_collection = FLAGS_rocksdb_enable_blob_garbage_collection;
}
return rocksdb::Status::OK();
}

rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
GraphSpaceID spaceId,
int32_t vidLen) {
Expand Down Expand Up @@ -214,6 +253,11 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
return s;
}

s = initRocksdbKVSeparation(baseOpts);
if (!s.ok()) {
return s;
}

if (FLAGS_num_compaction_threads > 0) {
static std::shared_ptr<rocksdb::ConcurrentTaskLimiter> compaction_thread_limiter{
rocksdb::NewConcurrentTaskLimiter("compaction", FLAGS_num_compaction_threads)};
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/RocksEngineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ DECLARE_string(rocksdb_wal_dir);
DECLARE_string(rocksdb_backup_dir);
DECLARE_int32(rocksdb_backup_interval_secs);

// rocksdb key value separation options
DECLARE_bool(rocksdb_enable_kv_separation);
DECLARE_uint64(rocksdb_kv_separation_threshold);

namespace nebula {
namespace kvstore {

Expand Down
25 changes: 25 additions & 0 deletions src/kvstore/test/RocksEngineConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,31 @@ TEST(RocksEngineConfigTest, CompressionConfigTest) {
}
}

TEST(RocksEngineConfigTest, KeyValueSeparationTest) {
FLAGS_rocksdb_enable_kv_separation = true;
FLAGS_rocksdb_kv_separation_threshold = 10;
rocksdb::Options options;
auto status = initRocksdbOptions(options, 1);
ASSERT_TRUE(status.ok()) << status.ToString();

rocksdb::DB* db = nullptr;
SCOPE_EXIT { delete db; };
options.create_if_missing = true;
fs::TempDir rootPath("/tmp/RocksDBCompressionConfigTest.XXXXXX");
status = rocksdb::DB::Open(options, rootPath.path(), &db);
ASSERT_TRUE(status.ok()) << status.ToString();

std::string key = "test";
std::string value = "This is a test value with value size greater than 10";
status = db->Put(rocksdb::WriteOptions(), key, value);
ASSERT_TRUE(status.ok()) << status.ToString();

std::string read_value;
status = db->Get(rocksdb::ReadOptions(), key, &read_value);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(value, read_value);
}

} // namespace kvstore
} // namespace nebula

Expand Down

0 comments on commit ae83b89

Please sign in to comment.