Skip to content

Commit

Permalink
Merge branch 'master' into fix_shortest_path_crash
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie-Xie authored Apr 4, 2023
2 parents 77e3f3c + 16333f6 commit 721bcea
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 24 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
"custom_filter_interval_secs",
"accept_partial_success",
"system_memory_high_watermark_ratio",
"num_rows_to_check_memory",
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"accept_partial_success", {cpp2::ConfigMode::MUTABLE, false}},

{"rocksdb_db_options", {cpp2::ConfigMode::MUTABLE, true}},
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ class KVFilter {
/**
* @brief Whether remove the key during compaction
*
* @param level
* @param spaceId
* @param key
* @param val
* @return true Key will not be removed
* @return false Key will be removed
*/
virtual bool filter(GraphSpaceID spaceId,
virtual bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const = 0;
};
Expand Down
22 changes: 7 additions & 15 deletions src/kvstore/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
/**
* @brief whether remove the key during compaction
*
* @param level Levels of key in rocksdb, not used for now
* @param level Levels of key in rocksdb
* @param key Rocksdb key
* @param val Rocksdb val
* @return true Key will not be removed
Expand All @@ -45,8 +45,8 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
const rocksdb::Slice& val,
std::string*,
bool*) const override {
UNUSED(level);
return kvFilter_->filter(spaceId_,
return kvFilter_->filter(level,
spaceId_,
folly::StringPiece(key.data(), key.size()),
folly::StringPiece(val.data(), val.size()));
}
Expand Down Expand Up @@ -77,21 +77,14 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
*/
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
auto now = time::WallClock::fastNowInSec();
if (context.is_full_compaction || context.is_manual_compaction) {
LOG(INFO) << "Do full/manual compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
} else {
if (FLAGS_custom_filter_interval_secs >= 0 &&
now - lastRunCustomFilterTimeSec_ > FLAGS_custom_filter_interval_secs) {
LOG(INFO) << "Do custom minor compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}
LOG(INFO) << "Do default minor compaction!";
return std::unique_ptr<rocksdb::CompactionFilter>(nullptr);
// No worry, by default flush will not go through the custom compaction filter.
// See CompactionFilterFactory::ShouldFilterTableFileCreation.
LOG(INFO) << "Do automatic or periodic compaction!";
}
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}

const char* Name() const override {
Expand All @@ -102,7 +95,6 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {

private:
GraphSpaceID spaceId_;
int32_t lastRunCustomFilterTimeSec_ = 0;
};

/**
Expand Down
4 changes: 0 additions & 4 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include "kvstore/listener/elasticsearch/ESListener.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs,
24 * 3600,
"interval to trigger custom compaction, < 0 means always do "
"default minor compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(clean_wal_interval_secs, 600, "interval to trigger clean expired wal");
DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid space when restart");
Expand Down
12 changes: 11 additions & 1 deletion src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "storage/CommonUtils.h"
#include "storage/StorageFlags.h"

DEFINE_int32(min_level_for_custom_filter,
4,
"Minimal level compaction which will go through custom compaction filter");

namespace nebula {
namespace storage {

Expand All @@ -28,9 +32,15 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
CHECK_NOTNULL(schemaMan_);
}

bool filter(GraphSpaceID spaceId,
bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const override {
if (level < FLAGS_min_level_for_custom_filter) {
// for upper level such as L0/L1, we don't go through the custom
// validation to achieve better performance
return false;
}
if (NebulaKeyUtils::isTag(vIdLen_, key)) {
return !tagValid(spaceId, key, val);
} else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/test/CompactionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "storage/test/QueryTestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -167,6 +169,7 @@ TEST(CompactionFilterTest, InvalidSchemaFilterTest) {
adhoc->removeTagSchema(spaceId, tagId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -215,6 +218,7 @@ TEST(CompactionFilterTest, TTLFilterDataExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -262,6 +266,7 @@ TEST(CompactionFilterTest, TTLFilterDataNotExpiredTest) {
checkEdgeData(spaceVidLen, spaceId, 102, parts, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -323,6 +328,7 @@ TEST(CompactionFilterTest, DropIndexTest) {
adIndex->removeTagIndex(spaceId, indexId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -392,6 +398,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -460,6 +467,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexNotExpiredTest) {
checkIndexData(spaceId, 102, 6, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down
4 changes: 4 additions & 0 deletions src/storage/test/IndexWithTTLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "storage/mutate/UpdateEdgeProcessor.h"
#include "storage/mutate/UpdateVertexProcessor.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -182,6 +184,7 @@ TEST(IndexWithTTLTest, AddVerticesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down Expand Up @@ -229,6 +232,7 @@ TEST(IndexWithTTLTest, AddEdgesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down
1 change: 0 additions & 1 deletion tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def test_configs(self):
['STORAGE', 'v', 'int', 'MUTABLE', 3],
['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400],
['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0],
['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400],
['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],
Expand Down

0 comments on commit 721bcea

Please sign in to comment.