From d73f3d248a3e38bd3c88ae9ece1ef3afaae105f9 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 13 Jul 2021 11:05:36 +0800 Subject: [PATCH 1/4] feat: start compaction operation in CompactionFilter::Filter --- src/server/key_ttl_compaction_filter.h | 33 +++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index c049e8356a..c1e438f44a 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -40,13 +40,15 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter bool enabled, int32_t pidx, int32_t partition_version, - bool validate_hash) + bool validate_hash, + compaction_operations compaction_ops) : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled), _partition_index(pidx), _partition_version(partition_version), - _validate_partition_hash(validate_hash) + _validate_partition_hash(validate_hash), + _user_specified_operations(std::move(compaction_ops)) { } @@ -60,6 +62,23 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter return false; } + // ignore empty write + if (key.size() < 2) { + return false; + } + + // user specified compaction operations + if (!_user_specified_operations.empty()) { + std::string hash_key, sort_key; + pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key); + for (const auto &op : _user_specified_operations) { + if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) { + // return true if this data need to be deleted + return true; + } + } + } + uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value)); if (_default_ttl != 0 && expire_ts == 0) { @@ -94,6 +113,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter int32_t _partition_index; int32_t _partition_version; bool _validate_partition_hash; + compaction_operations _user_specified_operations; }; class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory @@ -105,13 +125,20 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor std::unique_ptr CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override { + compaction_operations tmp_filter_operations; + { + dsn::utils::auto_read_lock l(_lock); + tmp_filter_operations = _user_specified_operations; + } + return std::unique_ptr( new KeyWithTTLCompactionFilter(_pegasus_data_version.load(), _default_ttl.load(), _enabled.load(), _partition_index.load(), _partition_version.load(), - _validate_partition_hash.load())); + _validate_partition_hash.load(), + std::move(tmp_filter_operations))); } const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; } From 1fee1a1d76fff051c8123df5120dbdf637e21561 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 13 Jul 2021 11:15:20 +0800 Subject: [PATCH 2/4] fix --- src/server/compaction_operation.cpp | 2 +- src/server/compaction_operation.h | 2 +- src/server/key_ttl_compaction_filter.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp index 86b12c0500..4b81040d8b 100644 --- a/src/server/compaction_operation.cpp +++ b/src/server/compaction_operation.cpp @@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const std::string &json, uint enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version); if (operation != nullptr) { operation->set_rules(std::move(rules)); - res.emplace_back(std::unique_ptr(operation)); + res.emplace_back(std::shared_ptr(operation)); } } return res; diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index f1cdf93350..e20b3b44c3 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -159,7 +159,7 @@ class update_ttl : public compaction_operation FRIEND_TEST(compaction_filter_operation_test, create_operations); }; -typedef std::vector> compaction_operations; +typedef std::vector> compaction_operations; compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version); void register_compaction_operations(); } // namespace server diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index c1e438f44a..ffeeefc510 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -41,7 +41,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter int32_t pidx, int32_t partition_version, bool validate_hash, - compaction_operations compaction_ops) + compaction_operations &&compaction_ops) : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled), From 0f0cb8ec66cecfcdafe51c889af5aa7fe779f075 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 14 Jul 2021 14:42:27 +0800 Subject: [PATCH 3/4] fix --- src/server/key_ttl_compaction_filter.h | 29 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index ffeeefc510..08fae5e904 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -67,16 +67,9 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter return false; } - // user specified compaction operations - if (!_user_specified_operations.empty()) { - std::string hash_key, sort_key; - pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key); - for (const auto &op : _user_specified_operations) { - if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) { - // return true if this data need to be deleted - return true; - } - } + if (!_user_specified_operations.empty() && + user_specified_operation_filter(key, existing_value, new_value, value_changed)) { + return true; } uint32_t expire_ts = @@ -92,6 +85,22 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key); } + bool user_specified_operation_filter(const rocksdb::Slice &key, + const rocksdb::Slice &existing_value, + std::string *new_value, + bool *value_changed) const + { + std::string hash_key, sort_key; + pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key); + for (const auto &op : _user_specified_operations) { + if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) { + // return true if this data need to be deleted + return true; + } + } + return false; + } + const char *Name() const override { return "KeyWithTTLCompactionFilter"; } // Check if the record is stale after partition split, which will split the partition into two From a460b49ec2b48b95e188f026039673b5b4bd5ea3 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 14 Jul 2021 15:51:21 +0800 Subject: [PATCH 4/4] fix --- src/server/key_ttl_compaction_filter.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index 08fae5e904..e3f94a3d88 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -62,7 +62,8 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter return false; } - // ignore empty write + // ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need + // deal with it here. if (key.size() < 2) { return false; }