Skip to content

Commit

Permalink
Change the compaction filter logic to let periodic compaction go thro…
Browse files Browse the repository at this point in the history
…ugh custom compaction filter, to gc expired data (#5447)
  • Loading branch information
luyade authored Apr 4, 2023
1 parent 6eaa621 commit 16333f6
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 24 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
"custom_filter_interval_secs",
"accept_partial_success",
"system_memory_high_watermark_ratio",
"num_rows_to_check_memory",
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},

This comment has been minimized.

Copy link
@syedd13

syedd13 Dec 24, 2023

write many times instead

{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"accept_partial_success", {cpp2::ConfigMode::MUTABLE, false}},

{"rocksdb_db_options", {cpp2::ConfigMode::MUTABLE, true}},
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ class KVFilter {
/**
* @brief Whether remove the key during compaction
*
* @param level
* @param spaceId
* @param key
* @param val
* @return true Key will not be removed
* @return false Key will be removed
*/
virtual bool filter(GraphSpaceID spaceId,
virtual bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const = 0;
};
Expand Down
22 changes: 7 additions & 15 deletions src/kvstore/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
/**
* @brief whether remove the key during compaction
*
* @param level Levels of key in rocksdb, not used for now
* @param level Levels of key in rocksdb
* @param key Rocksdb key
* @param val Rocksdb val
* @return true Key will not be removed
Expand All @@ -45,8 +45,8 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
const rocksdb::Slice& val,
std::string*,
bool*) const override {
UNUSED(level);
return kvFilter_->filter(spaceId_,
return kvFilter_->filter(level,
spaceId_,
folly::StringPiece(key.data(), key.size()),
folly::StringPiece(val.data(), val.size()));
}
Expand Down Expand Up @@ -77,21 +77,14 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
*/
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
auto now = time::WallClock::fastNowInSec();
if (context.is_full_compaction || context.is_manual_compaction) {
LOG(INFO) << "Do full/manual compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
} else {
if (FLAGS_custom_filter_interval_secs >= 0 &&
now - lastRunCustomFilterTimeSec_ > FLAGS_custom_filter_interval_secs) {
LOG(INFO) << "Do custom minor compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}
LOG(INFO) << "Do default minor compaction!";
return std::unique_ptr<rocksdb::CompactionFilter>(nullptr);
// No worry, by default flush will not go through the custom compaction filter.
// See CompactionFilterFactory::ShouldFilterTableFileCreation.
LOG(INFO) << "Do automatic or periodic compaction!";
}
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}

const char* Name() const override {
Expand All @@ -102,7 +95,6 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {

private:
GraphSpaceID spaceId_;
int32_t lastRunCustomFilterTimeSec_ = 0;
};

/**
Expand Down
4 changes: 0 additions & 4 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include "kvstore/listener/elasticsearch/ESListener.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs,
24 * 3600,
"interval to trigger custom compaction, < 0 means always do "
"default minor compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(clean_wal_interval_secs, 600, "interval to trigger clean expired wal");
DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid space when restart");
Expand Down
12 changes: 11 additions & 1 deletion src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "storage/CommonUtils.h"
#include "storage/StorageFlags.h"

DEFINE_int32(min_level_for_custom_filter,
4,
"Minimal level compaction which will go through custom compaction filter");

namespace nebula {
namespace storage {

Expand All @@ -28,9 +32,15 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
CHECK_NOTNULL(schemaMan_);
}

bool filter(GraphSpaceID spaceId,
bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const override {
if (level < FLAGS_min_level_for_custom_filter) {
// for upper level such as L0/L1, we don't go through the custom
// validation to achieve better performance
return false;
}
if (NebulaKeyUtils::isTag(vIdLen_, key)) {
return !tagValid(spaceId, key, val);
} else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/test/CompactionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "storage/test/QueryTestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -167,6 +169,7 @@ TEST(CompactionFilterTest, InvalidSchemaFilterTest) {
adhoc->removeTagSchema(spaceId, tagId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -215,6 +218,7 @@ TEST(CompactionFilterTest, TTLFilterDataExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -262,6 +266,7 @@ TEST(CompactionFilterTest, TTLFilterDataNotExpiredTest) {
checkEdgeData(spaceVidLen, spaceId, 102, parts, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -323,6 +328,7 @@ TEST(CompactionFilterTest, DropIndexTest) {
adIndex->removeTagIndex(spaceId, indexId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -392,6 +398,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -460,6 +467,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexNotExpiredTest) {
checkIndexData(spaceId, 102, 6, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down
4 changes: 4 additions & 0 deletions src/storage/test/IndexWithTTLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "storage/mutate/UpdateEdgeProcessor.h"
#include "storage/mutate/UpdateVertexProcessor.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -182,6 +184,7 @@ TEST(IndexWithTTLTest, AddVerticesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down Expand Up @@ -229,6 +232,7 @@ TEST(IndexWithTTLTest, AddEdgesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down
1 change: 0 additions & 1 deletion tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def test_configs(self):
['STORAGE', 'v', 'int', 'MUTABLE', 3],
['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400],
['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0],
['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400],
['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],
Expand Down

0 comments on commit 16333f6

Please sign in to comment.