From dfa51272271b2ae6d577feb4cac5bf45be5263ac Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 9 Mar 2021 17:03:34 +0800 Subject: [PATCH 1/4] add filter check --- src/base/pegasus_key_schema.h | 25 +++++++++++---- src/server/key_ttl_compaction_filter.h | 43 +++++++++++++++++++++++--- src/server/pegasus_server_impl.cpp | 5 +-- 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index 59fecbfef1..b894c6d3da 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string & } } -// calculate hash from rocksdb key. -inline uint64_t pegasus_key_hash(const ::dsn::blob &key) +// calculate hash from rocksdb key or rocksdb slice +template +inline uint64_t pegasus_key_hash(const T &key) { - dassert(key.length() >= 2, "key length must be no less than 2"); + dassert(key.size() >= 2, "key length must be no less than 2"); // hash_key_len is in big endian uint16_t hash_key_len = be16toh(*(int16_t *)(key.data())); if (hash_key_len > 0) { // hash_key_len > 0, compute hash from hash_key - dassert(key.length() >= 2 + hash_key_len, + dassert(key.size() >= 2 + hash_key_len, "key length must be no less than (2 + hash_key_len)"); - return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0); + return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0); } else { // hash_key_len == 0, compute hash from sort_key - return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2, 0); + return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0); } } @@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key) return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0); } +// check key should be served this partition +// Notice: partition_version should be check if is greater than 0 before calling this function +template +inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t partition_version) +{ + auto target_pidx = pegasus_key_hash(key) & partition_version; + if (dsn_unlikely(target_pidx != pidx)) { + return false; + } + return true; +} + } // namespace pegasus diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index f6f410d973..e400e0385c 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -25,6 +25,7 @@ #include #include "base/pegasus_utils.h" +#include "base/pegasus_key_schema.h" #include "base/pegasus_value_schema.h" namespace pegasus { @@ -33,8 +34,16 @@ namespace server { class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter { public: - KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t default_ttl, bool enabled) - : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled) + KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, + uint32_t default_ttl, + bool enabled, + int32_t pidx, + int32_t partition_version) + : _pegasus_data_version(pegasus_data_version), + _default_ttl(default_ttl), + _enabled(enabled), + _partition_index(pidx), + _partition_version(partition_version) { } @@ -58,16 +67,26 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter *value_changed = true; return false; } - return check_if_ts_expired(utils::epoch_now(), expire_ts); + return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key); } const char *Name() const override { return "KeyWithTTLCompactionFilter"; } + bool check_if_stale_split_data(const rocksdb::Slice &key) const + { + if (_partition_version < 0 || _partition_index > _partition_version) { + return false; + } + return !check_pegasus_key_hash(key, _partition_index, _partition_version); + } + private: uint32_t _pegasus_data_version; uint32_t _default_ttl; bool _enabled; // only process filtering when _enabled == true mutable pegasus_value_generator _gen; + int32_t _partition_index; + int32_t _partition_version; }; class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory @@ -79,8 +98,12 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor std::unique_ptr CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override { - return std::unique_ptr(new KeyWithTTLCompactionFilter( - _pegasus_data_version.load(), _default_ttl.load(), _enabled.load())); + return std::unique_ptr( + new KeyWithTTLCompactionFilter(_pegasus_data_version.load(), + _default_ttl.load(), + _enabled.load(), + _partition_index.load(), + _partition_version.load())); } const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; } @@ -90,11 +113,21 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor } void EnableFilter() { _enabled.store(true, std::memory_order_release); } void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); } + void SetPartitionIndex(int32_t pidx) + { + _partition_index.store(pidx, std::memory_order_release); + } + void SetPartitionVersion(int32_t partition_version) + { + _partition_version.store(partition_version, std::memory_order_release); + } private: std::atomic _pegasus_data_version; std::atomic _default_ttl; std::atomic_bool _enabled; // only process filtering when _enabled == true + std::atomic _partition_index{0}; + std::atomic _partition_version{-1}; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 38b15d7f55..173235acfd 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1458,6 +1458,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) // only enable filter after correct pegasus_data_version set _key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version); + _key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index()); + _key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1); _key_ttl_compaction_filter_factory->EnableFilter(); parse_checkpoints(); @@ -2750,8 +2752,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) int32_t old_partition_version = _partition_version.exchange(partition_version); ddebug_replica( "update partition version from {} to {}", old_partition_version, partition_version); - - // TODO(heyuchen): set filter _partition_version in further pr + _key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version); } ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait) From f9a2e63000155b5a87bfaf3bcb8f14fda3eb6dc2 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 10 Mar 2021 13:04:44 +0800 Subject: [PATCH 2/4] add validate hash and bug fix --- src/base/pegasus_const.cpp | 3 +++ src/base/pegasus_const.h | 2 ++ src/server/config.min.ini | 2 +- src/server/key_ttl_compaction_filter.h | 18 ++++++++++++++---- src/server/pegasus_server_impl.cpp | 21 +++++++++++++++++++++ src/server/pegasus_server_impl.h | 3 +++ 6 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index c80b1e6c39..07e0187bf0 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -88,4 +88,7 @@ const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold /// time threshold of each rocksdb iteration const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms"); + +/// true means compaction and scan will check partition_hash, otherwise false +const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); } // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h index 4f4814d29a..4f131eb465 100644 --- a/src/base/pegasus_const.h +++ b/src/base/pegasus_const.h @@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME; extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD; extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS; + +extern const std::string SPLIT_VALIDATE_PARTITION_HASH; } // namespace pegasus diff --git a/src/server/config.min.ini b/src/server/config.min.ini index 5b9a7b191d..d7bd9085aa 100644 --- a/src/server/config.min.ini +++ b/src/server/config.min.ini @@ -131,7 +131,7 @@ # Options: # - falcon # - prometheus - perf_counter_sink = prometheus + perf_counter_sink = # The HTTP port exposed to Prometheus for pulling metrics from pegasus server. prometheus_port = @PROMETHEUS_PORT@ diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index e400e0385c..234d26e10f 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -38,12 +38,14 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter uint32_t default_ttl, bool enabled, int32_t pidx, - int32_t partition_version) + int32_t partition_version, + bool check_hash) : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled), _partition_index(pidx), - _partition_version(partition_version) + _partition_version(partition_version), + _check_partition_hash(check_hash) { } @@ -74,7 +76,8 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter bool check_if_stale_split_data(const rocksdb::Slice &key) const { - if (_partition_version < 0 || _partition_index > _partition_version) { + if (!_check_partition_hash || key.size() < 2 || _partition_version < 0 || + _partition_index > _partition_version) { return false; } return !check_pegasus_key_hash(key, _partition_index, _partition_version); @@ -87,6 +90,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter mutable pegasus_value_generator _gen; int32_t _partition_index; int32_t _partition_version; + bool _check_partition_hash; }; class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory @@ -103,7 +107,8 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor _default_ttl.load(), _enabled.load(), _partition_index.load(), - _partition_version.load())); + _partition_version.load(), + _check_partition_hash.load())); } const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; } @@ -113,6 +118,10 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor } void EnableFilter() { _enabled.store(true, std::memory_order_release); } void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); } + void SetCheckPartitionHash(bool check_hash) + { + _check_partition_hash.store(check_hash, std::memory_order_release); + } void SetPartitionIndex(int32_t pidx) { _partition_index.store(pidx, std::memory_order_release); @@ -128,6 +137,7 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor std::atomic_bool _enabled; // only process filtering when _enabled == true std::atomic _partition_index{0}; std::atomic _partition_version{-1}; + std::atomic_bool _check_partition_hash{false}; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 173235acfd..f0e2d80164 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2296,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const std::map &envs) +{ + bool new_value = false; + auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH); + if (iter != envs.end()) { + if (!dsn::buf2bool(iter->second, new_value)) { + derror_replica("{}={} is invalid.", iter->first, iter->second); + return; + } + } + if (new_value != _check_partition_hash) { + ddebug_replica( + "update '_check_partition_hash' from {} to {}", _check_partition_hash, new_value); + _check_partition_hash = new_value; + _key_ttl_compaction_filter_factory->SetCheckPartitionHash(_check_partition_hash); + } +} + bool pegasus_server_impl::parse_compression_types( const std::string &config, std::vector &compression_per_level) { diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 5785624a5c..a67937a78f 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -262,6 +262,8 @@ class pegasus_server_impl : public pegasus_read_service void update_rocksdb_iteration_threshold(const std::map &envs); + void update_validate_partition_hash(const std::map &envs); + // return true if parse compression types 'config' success, otherwise return false. // 'compression_per_level' will not be changed if parse failed. bool parse_compression_types(const std::string &config, @@ -402,6 +404,7 @@ class pegasus_server_impl : public pegasus_read_service pegasus_manual_compact_service _manual_compact_svc; std::atomic _partition_version; + bool _check_partition_hash{false}; dsn::replication::ingestion_status::type _ingestion_status{ dsn::replication::ingestion_status::IS_INVALID}; From ff32fccd2a4bc62c542d536b0b08d5e590c5d84f Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 22 Mar 2021 09:54:15 +0800 Subject: [PATCH 3/4] update rdsn --- rdsn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdsn b/rdsn index c057d35c93..290a8116d0 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit c057d35c93ff9d895d5eefef343b6c7afd74812d +Subproject commit 290a8116d047d2cb2ee3d74026d796d0e4d228df From ad79fd83c17f2e2dac2012a18054b3126c90e318 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 22 Mar 2021 18:07:48 +0800 Subject: [PATCH 4/4] fix by cr --- src/base/pegasus_const.cpp | 2 +- src/server/key_ttl_compaction_filter.h | 18 ++++++++++-------- src/server/pegasus_server_impl.cpp | 8 ++++---- src/server/pegasus_server_impl.h | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index 07e0187bf0..da529b8485 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -89,6 +89,6 @@ const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms"); -/// true means compaction and scan will check partition_hash, otherwise false +/// true means compaction and scan will validate partition_hash, otherwise false const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); } // namespace pegasus diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index 234d26e10f..4f26844411 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -39,13 +39,13 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter bool enabled, int32_t pidx, int32_t partition_version, - bool check_hash) + bool validate_hash) : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled), _partition_index(pidx), _partition_version(partition_version), - _check_partition_hash(check_hash) + _validate_partition_hash(validate_hash) { } @@ -74,9 +74,11 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter const char *Name() const override { return "KeyWithTTLCompactionFilter"; } + // Check if the record is stale after partition split, which will split the partition into two + // halves. The stale record belongs to the other half. bool check_if_stale_split_data(const rocksdb::Slice &key) const { - if (!_check_partition_hash || key.size() < 2 || _partition_version < 0 || + if (!_validate_partition_hash || key.size() < 2 || _partition_version < 0 || _partition_index > _partition_version) { return false; } @@ -90,7 +92,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter mutable pegasus_value_generator _gen; int32_t _partition_index; int32_t _partition_version; - bool _check_partition_hash; + bool _validate_partition_hash; }; class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory @@ -108,7 +110,7 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor _enabled.load(), _partition_index.load(), _partition_version.load(), - _check_partition_hash.load())); + _validate_partition_hash.load())); } const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; } @@ -118,9 +120,9 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor } void EnableFilter() { _enabled.store(true, std::memory_order_release); } void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); } - void SetCheckPartitionHash(bool check_hash) + void SetValidatePartitionHash(bool validate_hash) { - _check_partition_hash.store(check_hash, std::memory_order_release); + _validate_partition_hash.store(validate_hash, std::memory_order_release); } void SetPartitionIndex(int32_t pidx) { @@ -137,7 +139,7 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor std::atomic_bool _enabled; // only process filtering when _enabled == true std::atomic _partition_index{0}; std::atomic _partition_version{-1}; - std::atomic_bool _check_partition_hash{false}; + std::atomic_bool _validate_partition_hash{false}; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index f0e2d80164..7816bcda2c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2453,11 +2453,11 @@ void pegasus_server_impl::update_validate_partition_hash( return; } } - if (new_value != _check_partition_hash) { + if (new_value != _validate_partition_hash) { ddebug_replica( - "update '_check_partition_hash' from {} to {}", _check_partition_hash, new_value); - _check_partition_hash = new_value; - _key_ttl_compaction_filter_factory->SetCheckPartitionHash(_check_partition_hash); + "update '_validate_partition_hash' from {} to {}", _validate_partition_hash, new_value); + _validate_partition_hash = new_value; + _key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash); } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index a67937a78f..2b314c3916 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -404,7 +404,7 @@ class pegasus_server_impl : public pegasus_read_service pegasus_manual_compact_service _manual_compact_svc; std::atomic _partition_version; - bool _check_partition_hash{false}; + bool _validate_partition_hash{false}; dsn::replication::ingestion_status::type _ingestion_status{ dsn::replication::ingestion_status::IS_INVALID};