Skip to content

Commit

Permalink
Enable prefix bloom filter by default (vesoft-inc#2860)
Browse files Browse the repository at this point in the history
* enable prefix bloom filter by default

* GetProperty

* make rate limiter dynamic configurable

Co-authored-by: Yee <[email protected]>
  • Loading branch information
critical27 and yixinglu authored Sep 25, 2021
1 parent 21c1b5c commit 5fe655a
Show file tree
Hide file tree
Showing 23 changed files with 923 additions and 229 deletions.
6 changes: 4 additions & 2 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable rocksdb's prefix bloom filter, enabled by default.
--enable_rocksdb_prefix_filtering=true
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
Expand Down
6 changes: 4 additions & 2 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable rocksdb's prefix bloom filter, enabled by default.
--enable_rocksdb_prefix_filtering=true
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############### misc ####################
--snapshot_part_rate_limit=8388608
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class KVEngine {
virtual nebula::cpp2::ErrorCode setDBOption(const std::string& configKey,
const std::string& configValue) = 0;

// Get DB Property
virtual ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(
const std::string& property) = 0;

virtual nebula::cpp2::ErrorCode compact() = 0;

virtual nebula::cpp2::ErrorCode flush() = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class KVStore {

virtual std::vector<std::string> getDataRoot() const = 0;

virtual ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(
GraphSpaceID spaceId, const std::string& property) = 0;

protected:
KVStore() = default;
};
Expand Down
13 changes: 7 additions & 6 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "kvstore/RateLimiter.h"

DEFINE_uint32(snapshot_part_rate_limit,
1024 * 1024 * 2,
1024 * 1024 * 8,
"max bytes of pulling snapshot for each partition in one second");
DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes");

Expand All @@ -22,16 +22,15 @@ const int32_t kReserveNum = 1024 * 4;

NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
// Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit.
// So by default, the total send rate is limited to 4 * 2Mb = 8Mb.
// So by default, the total send rate is limited to 4 * 8Mb = 32Mb.
LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit
<< " for each part";
<< " for each part by default";
}

void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_part_rate_limit);
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
std::vector<std::string> data;
Expand Down Expand Up @@ -74,7 +73,9 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
size_t batchSize = 0;
while (iter && iter->valid()) {
if (batchSize >= FLAGS_snapshot_batch_size) {
rateLimiter->consume(batchSize);
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
Expand Down
21 changes: 21 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,5 +1168,26 @@ nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spac
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
GraphSpaceID spaceId, const std::string& property) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
LOG(ERROR) << "Get Space " << spaceId << " Failed";
return error(spaceRet);
}
auto space = nebula::value(spaceRet);

folly::dynamic obj = folly::dynamic::object;
for (size_t i = 0; i < space->engines_.size(); i++) {
auto val = space->engines_[i]->getProperty(property);
if (!ok(val)) {
return error(val);
}
auto eng = folly::stringPrintf("Engine %zu", i);
obj[eng] = std::move(value(val));
}
return folly::toJson(obj);
}

} // namespace kvstore
} // namespace nebula
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ class NebulaStore : public KVStore, public Handler {
nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) override;

ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;

private:
void loadPartFromDataPath();

Expand Down
15 changes: 6 additions & 9 deletions src/kvstore/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,29 @@ namespace kvstore {
// For now, there are two major cases: snapshot (both for balance or catch up) and rebuild index.
class RateLimiter {
public:
RateLimiter(int32_t rate, int32_t burstSize)
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {
RateLimiter() {
// token will be available after 1 second, to prevent speed spike at the beginning
auto now = time::WallClock::fastNowInSec();
int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1;
bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast<double>(now + waitInSec)));
bucket_.reset(new folly::DynamicTokenBucket(static_cast<double>(now + waitInSec)));
}

// Caller must make sure the **the parition has been add, and won't be removed during consume.**
// Snaphot and rebuild index follow this principle by design.
void consume(size_t toConsume) {
if (toConsume > burstSize_) {
void consume(double toConsume, double rate, double burstSize) {
if (toConsume > burstSize) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
bucket_->consumeWithBorrowAndWait(static_cast<double>(toConsume), static_cast<double>(now));
bucket_->consumeWithBorrowAndWait(toConsume, rate, burstSize, static_cast<double>(now));
}
}

private:
std::unique_ptr<folly::TokenBucket> bucket_;
double rate_{1 << 20};
double burstSize_{1 << 20};
std::unique_ptr<folly::DynamicTokenBucket> bucket_;
};

} // namespace kvstore
Expand Down
41 changes: 39 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "common/fs/FileUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/KVStore.h"
#include "kvstore/RocksEngineConfig.h"

DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset");

Expand Down Expand Up @@ -126,6 +125,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
CHECK(status.ok()) << status.ToString();
db_.reset(db);
partsNum_ = allParts().size();
extractorLen_ = sizeof(PartitionID) + vIdLen;
LOG(INFO) << "open rocksdb on " << path;

backup();
Expand Down Expand Up @@ -202,7 +202,7 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = true;
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand All @@ -213,7 +213,32 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,

nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
// In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make
// sure the prefix bloom filter exists. But this is quite error-proning, so we do a check here.
if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) {
return prefixWithExtractor(prefix, storageIter);
} else {
return prefixWithoutExtractor(prefix, storageIter);
}
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
}
storageIter->reset(new RocksPrefixIter(iter, prefix));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
const std::string& prefix, std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
Expand All @@ -226,6 +251,8 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down Expand Up @@ -397,6 +424,16 @@ nebula::cpp2::ErrorCode RocksEngine::setDBOption(const std::string& configKey,
}
}

ErrorOr<nebula::cpp2::ErrorCode, std::string> RocksEngine::getProperty(
const std::string& property) {
std::string value;
if (!db_->GetProperty(property, &value)) {
return nebula::cpp2::ErrorCode::E_INVALID_PARM;
} else {
return value;
}
}

nebula::cpp2::ErrorCode RocksEngine::compact() {
rocksdb::CompactRangeOptions options;
options.change_level = FLAGS_rocksdb_compact_change_level;
Expand Down
10 changes: 10 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/base/Base.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVIterator.h"
#include "kvstore/RocksEngineConfig.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -128,6 +129,12 @@ class RocksEngine : public KVEngine {
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter);

/*********************
* Data modification
********************/
Expand Down Expand Up @@ -161,6 +168,8 @@ class RocksEngine : public KVEngine {
nebula::cpp2::ErrorCode setDBOption(const std::string& configKey,
const std::string& configValue) override;

ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(const std::string& property) override;

nebula::cpp2::ErrorCode compact() override;

nebula::cpp2::ErrorCode flush() override;
Expand Down Expand Up @@ -190,6 +199,7 @@ class RocksEngine : public KVEngine {
std::string backupPath_;
std::unique_ptr<rocksdb::BackupEngine> backupDb_{nullptr};
int32_t partsNum_ = -1;
size_t extractorLen_;
};

} // namespace kvstore
Expand Down
59 changes: 15 additions & 44 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ DEFINE_int64(rocksdb_block_cache,
1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");
DEFINE_int32(rocksdb_row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp");

Expand All @@ -78,15 +78,13 @@ DEFINE_int32(rocksdb_rate_limit,
0,
"write limit in bytes per sec. The unit is MB. 0 means unlimited.");

DEFINE_bool(enable_rocksdb_prefix_filtering,
DEFINE_bool(enable_rocksdb_whole_key_filtering,
false,
"Whether or not to enable rocksdb's whole key bloom filter");

DEFINE_bool(enable_rocksdb_prefix_filtering,
true,
"Whether or not to enable rocksdb's prefix bloom filter.");
DEFINE_bool(rocksdb_prefix_bloom_filter_length_flag,
false,
"If true, prefix bloom filter will be sizeof(PartitionID) + vidLen + "
"sizeof(EdgeType). "
"If false, prefix bloom filter will be sizeof(PartitionID) + vidLen. ");
DEFINE_int32(rocksdb_plain_table_prefix_length, 4, "PlainTable prefix size");

DEFINE_bool(rocksdb_compact_change_level,
true,
Expand Down Expand Up @@ -118,34 +116,6 @@ DEFINE_int32(rocksdb_backup_interval_secs,
namespace nebula {
namespace kvstore {

class GraphPrefixTransform : public rocksdb::SliceTransform {
private:
size_t prefixLen_;
std::string name_;

public:
explicit GraphPrefixTransform(size_t prefixLen)
: prefixLen_(prefixLen), name_("nebula.GraphPrefix." + std::to_string(prefixLen_)) {}

const char* Name() const override { return name_.c_str(); }

rocksdb::Slice Transform(const rocksdb::Slice& src) const override {
return rocksdb::Slice(src.data(), prefixLen_);
}

bool InDomain(const rocksdb::Slice& key) const override {
if (key.size() < prefixLen_) {
return false;
}
// And we should not use NebulaKeyUtils::isVertex or isEdge here, because it
// will regard the prefix itself not in domain since its length does not
// satisfy
constexpr int32_t len = static_cast<int32_t>(sizeof(NebulaKeyType));
auto type = static_cast<NebulaKeyType>(readInt<uint32_t>(key.data(), len) & kTypeMask);
return type == NebulaKeyType::kEdge || type == NebulaKeyType::kVertex;
}
};

static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
static std::unordered_map<std::string, rocksdb::CompressionType> m = {
{"no", rocksdb::kNoCompression},
Expand Down Expand Up @@ -256,10 +226,8 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.rate_limiter = rate_limiter;
}

size_t prefixLength = sizeof(PartitionID) + vidLen;
if (FLAGS_rocksdb_table_format == "BlockBasedTable") {
size_t prefixLength = FLAGS_rocksdb_prefix_bloom_filter_length_flag
? sizeof(PartitionID) + vidLen + sizeof(EdgeType)
: sizeof(PartitionID) + vidLen;
// BlockBasedTableOptions
std::unordered_map<std::string, std::string> bbtOptsMap;
if (!loadOptionsMap(bbtOptsMap, FLAGS_rocksdb_block_based_table_options)) {
Expand All @@ -279,9 +247,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
bbtOpts.block_cache = blockCache;
}

if (FLAGS_row_cache_num) {
if (FLAGS_rocksdb_row_cache_num) {
static std::shared_ptr<rocksdb::Cache> rowCache =
rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp);
rocksdb::NewLRUCache(FLAGS_rocksdb_row_cache_num, FLAGS_cache_bucket_exp);
baseOpts.row_cache = rowCache;
}

Expand All @@ -296,8 +264,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
if (FLAGS_enable_rocksdb_prefix_filtering) {
baseOpts.prefix_extractor.reset(new GraphPrefixTransform(prefixLength));
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
}
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering;
baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts));
baseOpts.create_if_missing = true;
} else if (FLAGS_rocksdb_table_format == "PlainTable") {
Expand All @@ -308,8 +277,10 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
// by default. WAL_ttl_seconds and rocksdb_backup_interval_secs need to be
// modify together if necessary
FLAGS_rocksdb_disable_wal = false;
baseOpts.prefix_extractor.reset(
rocksdb::NewCappedPrefixTransform(FLAGS_rocksdb_plain_table_prefix_length));
if (!FLAGS_enable_rocksdb_prefix_filtering) {
return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter");
}
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory());
baseOpts.create_if_missing = true;
} else {
Expand Down
Loading

0 comments on commit 5fe655a

Please sign in to comment.