diff --git a/rdsn b/rdsn index 140548388e..290a8116d0 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 140548388e62700b3f018c9f6ccaf2f0b396df35 +Subproject commit 290a8116d047d2cb2ee3d74026d796d0e4d228df diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index c80b1e6c39..da529b8485 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -88,4 +88,7 @@ const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold /// time threshold of each rocksdb iteration const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms"); + +/// true means compaction and scan will validate partition_hash, otherwise false +const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); } // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h index 4f4814d29a..4f131eb465 100644 --- a/src/base/pegasus_const.h +++ b/src/base/pegasus_const.h @@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME; extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD; extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS; + +extern const std::string SPLIT_VALIDATE_PARTITION_HASH; } // namespace pegasus diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index 59fecbfef1..b894c6d3da 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string & } } -// calculate hash from rocksdb key. -inline uint64_t pegasus_key_hash(const ::dsn::blob &key) +// calculate hash from rocksdb key or rocksdb slice +template +inline uint64_t pegasus_key_hash(const T &key) { - dassert(key.length() >= 2, "key length must be no less than 2"); + dassert(key.size() >= 2, "key length must be no less than 2"); // hash_key_len is in big endian uint16_t hash_key_len = be16toh(*(int16_t *)(key.data())); if (hash_key_len > 0) { // hash_key_len > 0, compute hash from hash_key - dassert(key.length() >= 2 + hash_key_len, + dassert(key.size() >= 2 + hash_key_len, "key length must be no less than (2 + hash_key_len)"); - return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0); + return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0); } else { // hash_key_len == 0, compute hash from sort_key - return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2, 0); + return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0); } } @@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key) return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0); } +// check key should be served this partition +// Notice: partition_version should be check if is greater than 0 before calling this function +template +inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t partition_version) +{ + auto target_pidx = pegasus_key_hash(key) & partition_version; + if (dsn_unlikely(target_pidx != pidx)) { + return false; + } + return true; +} + } // namespace pegasus diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h index f6f410d973..4f26844411 100644 --- a/src/server/key_ttl_compaction_filter.h +++ b/src/server/key_ttl_compaction_filter.h @@ -25,6 +25,7 @@ #include #include "base/pegasus_utils.h" +#include "base/pegasus_key_schema.h" #include "base/pegasus_value_schema.h" namespace pegasus { @@ -33,8 +34,18 @@ namespace server { class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter { public: - KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t default_ttl, bool enabled) - : _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled) + KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, + uint32_t default_ttl, + bool enabled, + int32_t pidx, + int32_t partition_version, + bool validate_hash) + : _pegasus_data_version(pegasus_data_version), + _default_ttl(default_ttl), + _enabled(enabled), + _partition_index(pidx), + _partition_version(partition_version), + _validate_partition_hash(validate_hash) { } @@ -58,16 +69,30 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter *value_changed = true; return false; } - return check_if_ts_expired(utils::epoch_now(), expire_ts); + return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key); } const char *Name() const override { return "KeyWithTTLCompactionFilter"; } + // Check if the record is stale after partition split, which will split the partition into two + // halves. The stale record belongs to the other half. + bool check_if_stale_split_data(const rocksdb::Slice &key) const + { + if (!_validate_partition_hash || key.size() < 2 || _partition_version < 0 || + _partition_index > _partition_version) { + return false; + } + return !check_pegasus_key_hash(key, _partition_index, _partition_version); + } + private: uint32_t _pegasus_data_version; uint32_t _default_ttl; bool _enabled; // only process filtering when _enabled == true mutable pegasus_value_generator _gen; + int32_t _partition_index; + int32_t _partition_version; + bool _validate_partition_hash; }; class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory @@ -79,8 +104,13 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor std::unique_ptr CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override { - return std::unique_ptr(new KeyWithTTLCompactionFilter( - _pegasus_data_version.load(), _default_ttl.load(), _enabled.load())); + return std::unique_ptr( + new KeyWithTTLCompactionFilter(_pegasus_data_version.load(), + _default_ttl.load(), + _enabled.load(), + _partition_index.load(), + _partition_version.load(), + _validate_partition_hash.load())); } const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; } @@ -90,11 +120,26 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor } void EnableFilter() { _enabled.store(true, std::memory_order_release); } void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); } + void SetValidatePartitionHash(bool validate_hash) + { + _validate_partition_hash.store(validate_hash, std::memory_order_release); + } + void SetPartitionIndex(int32_t pidx) + { + _partition_index.store(pidx, std::memory_order_release); + } + void SetPartitionVersion(int32_t partition_version) + { + _partition_version.store(partition_version, std::memory_order_release); + } private: std::atomic _pegasus_data_version; std::atomic _default_ttl; std::atomic_bool _enabled; // only process filtering when _enabled == true + std::atomic _partition_index{0}; + std::atomic _partition_version{-1}; + std::atomic_bool _validate_partition_hash{false}; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 38b15d7f55..7816bcda2c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1458,6 +1458,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) // only enable filter after correct pegasus_data_version set _key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version); + _key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index()); + _key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1); _key_ttl_compaction_filter_factory->EnableFilter(); parse_checkpoints(); @@ -2294,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const std::map &envs) +{ + bool new_value = false; + auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH); + if (iter != envs.end()) { + if (!dsn::buf2bool(iter->second, new_value)) { + derror_replica("{}={} is invalid.", iter->first, iter->second); + return; + } + } + if (new_value != _validate_partition_hash) { + ddebug_replica( + "update '_validate_partition_hash' from {} to {}", _validate_partition_hash, new_value); + _validate_partition_hash = new_value; + _key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash); + } +} + bool pegasus_server_impl::parse_compression_types( const std::string &config, std::vector &compression_per_level) { @@ -2750,8 +2773,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) int32_t old_partition_version = _partition_version.exchange(partition_version); ddebug_replica( "update partition version from {} to {}", old_partition_version, partition_version); - - // TODO(heyuchen): set filter _partition_version in further pr + _key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version); } ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait) diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 5785624a5c..2b314c3916 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -262,6 +262,8 @@ class pegasus_server_impl : public pegasus_read_service void update_rocksdb_iteration_threshold(const std::map &envs); + void update_validate_partition_hash(const std::map &envs); + // return true if parse compression types 'config' success, otherwise return false. // 'compression_per_level' will not be changed if parse failed. bool parse_compression_types(const std::string &config, @@ -402,6 +404,7 @@ class pegasus_server_impl : public pegasus_read_service pegasus_manual_compact_service _manual_compact_svc; std::atomic _partition_version; + bool _validate_partition_hash{false}; dsn::replication::ingestion_status::type _ingestion_status{ dsn::replication::ingestion_status::IS_INVALID};