From 12d2354646492895d29d51aed7b10c46cd943033 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 8 Jun 2021 16:59:51 +0800 Subject: [PATCH 1/4] fix --- src/server/compaction_filter_rule.h | 4 + src/server/compaction_operation.cpp | 47 +++++ src/server/compaction_operation.h | 27 +++ src/server/test/compaction_operation_test.cpp | 171 ++++++++++++++++++ 4 files changed, 249 insertions(+) diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h index 8927839632..bc61039e83 100644 --- a/src/server/compaction_filter_rule.h +++ b/src/server/compaction_filter_rule.h @@ -67,6 +67,8 @@ class hashkey_pattern_rule : public compaction_filter_rule FRIEND_TEST(hashkey_pattern_rule_test, match); FRIEND_TEST(delete_key_test, filter); + FRIEND_TEST(update_ttl_test, filter); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; class sortkey_pattern_rule : public compaction_filter_rule @@ -83,6 +85,7 @@ class sortkey_pattern_rule : public compaction_filter_rule string_match_type match_type; FRIEND_TEST(sortkey_pattern_rule_test, match); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; class ttl_range_rule : public compaction_filter_rule @@ -101,6 +104,7 @@ class ttl_range_rule : public compaction_filter_rule uint32_t pegasus_data_version; FRIEND_TEST(ttl_range_rule_test, match); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; } // namespace server } // namespace pegasus diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp index bb86e0c4da..bfa03efc28 100644 --- a/src/server/compaction_operation.cpp +++ b/src/server/compaction_operation.cpp @@ -17,6 +17,8 @@ * under the License. */ +#include "base/pegasus_utils.h" +#include "base/pegasus_value_schema.h" #include "compaction_operation.h" namespace pegasus { @@ -55,5 +57,50 @@ bool delete_key::filter(const std::string &hash_key, } return true; } + +update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version) + : compaction_operation(std::move(rules), pegasus_data_version) +{ +} + +bool update_ttl::filter(const std::string &hash_key, + const std::string &sort_key, + const rocksdb::Slice &existing_value, + std::string *new_value, + bool *value_changed) const +{ + if (!all_rules_match(hash_key, sort_key, existing_value)) { + return false; + } + + uint32_t new_ts = 0; + switch (type) { + case update_ttl_op_type::UTOT_FROM_NOW: + new_ts = utils::epoch_now() + timestamp; + break; + case update_ttl_op_type::UTOT_FROM_CURRENT: { + auto ttl = + pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value)); + if (ttl == 0) { + return false; + } + new_ts = timestamp + ttl; + break; + } + case update_ttl_op_type::UTOT_TIMESTAMP: + // make it's seconds since 2016.01.01-00:00:00 GMT + new_ts = timestamp - pegasus::utils::epoch_begin; + break; + default: + ddebug("invalid update ttl operation type"); + return false; + } + + *new_value = existing_value.ToString(); + pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts); + *value_changed = true; + return false; +} + } // namespace server } // namespace pegasus diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index a4e630527c..31006ddad1 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -70,6 +70,33 @@ class delete_key : public compaction_operation private: FRIEND_TEST(delete_key_test, filter); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); +}; + +enum update_ttl_op_type +{ + UTOT_FROM_NOW, + UTOT_FROM_CURRENT, + UTOT_TIMESTAMP, + UTOT_INVALID, +}; + +class update_ttl : public compaction_operation +{ +public: + update_ttl(filter_rules &&rules, uint32_t pegasus_data_version); + + bool filter(const std::string &hash_key, + const std::string &sort_key, + const rocksdb::Slice &existing_value, + std::string *new_value, + bool *value_changed) const; + +private: + update_ttl_op_type type; + uint32_t timestamp; + + FRIEND_TEST(update_ttl_test, filter); }; } // namespace server } // namespace pegasus diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp index b7140aa609..5ef35e1b71 100644 --- a/src/server/test/compaction_operation_test.cpp +++ b/src/server/test/compaction_operation_test.cpp @@ -20,11 +20,104 @@ #include #include "server/compaction_operation.h" #include "server/compaction_filter_rule.h" +#include "base/pegasus_value_schema.h" +#include "base/pegasus_utils.h" #include namespace pegasus { namespace server { +TEST(compaction_filter_operation_test, all_rules_match) +{ + struct test_case + { + bool all_match; + std::string hashkey; + std::string sortkey; + int32_t expire_ttl; + // hashkey_rule + std::string hashkey_pattern; + string_match_type hashkey_match_type; + // sortkey_rule + std::string sortkey_pattern; + string_match_type sortkey_match_type; + // ttl_range_rule + int32_t start_ttl; + int32_t stop_ttl; + } tests[] = { + {true, + "hashkey", + "sortkey", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hash_key", + "sortkey", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hashkey", + "sort_key", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hashkey", + "sortkey", + 10000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + }; + + uint32_t data_version = 1; + filter_rules rules; + rules.push_back(dsn::make_unique()); + rules.push_back(dsn::make_unique()); + rules.push_back(dsn::make_unique(data_version)); + delete_key update_operation(std::move(rules), data_version); + pegasus_value_generator gen; + auto now_ts = utils::epoch_now(); + for (const auto &test : tests) { + auto hash_rule = static_cast(update_operation.rules[0].get()); + auto sort_rule = static_cast(update_operation.rules[1].get()); + auto ttl_rule = static_cast(update_operation.rules[2].get()); + + hash_rule->pattern = test.hashkey_pattern; + hash_rule->match_type = test.hashkey_match_type; + sort_rule->pattern = test.sortkey_pattern; + sort_rule->match_type = test.sortkey_match_type; + ttl_rule->start_ttl = test.start_ttl; + ttl_rule->stop_ttl = test.stop_ttl; + + rocksdb::SliceParts svalue = + gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0); + ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]), + test.all_match); + } + + // all_rules_match will return false if there is no rule in this operation + update_ttl no_rule_operation({}, data_version); + ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false); +} + TEST(delete_key_test, filter) { struct test_case @@ -51,5 +144,83 @@ TEST(delete_key_test, filter) delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr)); } } + +TEST(update_ttl_test, filter) +{ + struct test_case + { + bool value_changed; + uint32_t expect_ts; + std::string hashkey; + uint32_t expire_ts; + // hashkey_rule + std::string hashkey_pattern; + string_match_type hashkey_match_type; + // operation + update_ttl_op_type op_type; + uint32_t timestamp; + } tests[] = { + {true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_NOW, 1000}, + {false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000}, + {true, 1300, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000}, + {true, + 1000 + pegasus::utils::epoch_begin, + "hashkey", + 300, + "hashkey", + SMT_MATCH_ANYWHERE, + UTOT_TIMESTAMP, + 1000 + pegasus::utils::epoch_begin}, + {false, + 1000 + pegasus::utils::epoch_begin, + "hashkey", + 300, + "hashkey111", + SMT_MATCH_ANYWHERE, + UTOT_TIMESTAMP, + 1000 + pegasus::utils::epoch_begin}, + }; + + uint32_t data_version = 1; + filter_rules rules; + rules.push_back(dsn::make_unique()); + update_ttl update_operation(std::move(rules), data_version); + pegasus_value_generator gen; + for (const auto &test : tests) { + auto hash_rule = static_cast(update_operation.rules.begin()->get()); + hash_rule->pattern = test.hashkey_pattern; + hash_rule->match_type = test.hashkey_match_type; + update_operation.timestamp = test.timestamp; + update_operation.type = test.op_type; + + std::string new_value; + bool value_changed = false; + rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0); + uint32_t before_ts = utils::epoch_now(); + ASSERT_EQ( + false, + update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed)); + ASSERT_EQ(test.value_changed, value_changed); + if (value_changed) { + uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value); + switch (test.op_type) { + case UTOT_TIMESTAMP: + ASSERT_EQ(new_ts + pegasus::utils::epoch_begin, test.expect_ts); + break; + case UTOT_FROM_CURRENT: + ASSERT_EQ(new_ts, test.expect_ts); + break; + case UTOT_FROM_NOW: { + uint32_t after_ts = utils::epoch_now(); + ASSERT_GE(new_ts, test.expect_ts + before_ts); + ASSERT_LE(new_ts, test.expect_ts + after_ts); + break; + } + default: + break; + } + } + } +} } // namespace server } // namespace pegasus From 6677cf4f1ae25bd622340bc3d8a5447902bb0bd9 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 10 Jun 2021 09:45:32 +0800 Subject: [PATCH 2/4] fix --- src/server/compaction_operation.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index 31006ddad1..713bef2c15 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -75,8 +75,11 @@ class delete_key : public compaction_operation enum update_ttl_op_type { + // update ttl to epoch_now() + timestamp UTOT_FROM_NOW, + // update ttl to {current ttl in rocksdb value} + timestamp UTOT_FROM_CURRENT, + // update ttl to timestamp UTOT_TIMESTAMP, UTOT_INVALID, }; From b262d308d53e9b0db5179805c830fe80854b478c Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 10 Jun 2021 18:59:48 +0800 Subject: [PATCH 3/4] fix --- src/server/compaction_operation.cpp | 6 +++--- src/server/compaction_operation.h | 9 +++++---- src/server/test/compaction_operation_test.cpp | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp index bfa03efc28..895c75dc5a 100644 --- a/src/server/compaction_operation.cpp +++ b/src/server/compaction_operation.cpp @@ -76,7 +76,7 @@ bool update_ttl::filter(const std::string &hash_key, uint32_t new_ts = 0; switch (type) { case update_ttl_op_type::UTOT_FROM_NOW: - new_ts = utils::epoch_now() + timestamp; + new_ts = utils::epoch_now() + value; break; case update_ttl_op_type::UTOT_FROM_CURRENT: { auto ttl = @@ -84,12 +84,12 @@ bool update_ttl::filter(const std::string &hash_key, if (ttl == 0) { return false; } - new_ts = timestamp + ttl; + new_ts = value + ttl; break; } case update_ttl_op_type::UTOT_TIMESTAMP: // make it's seconds since 2016.01.01-00:00:00 GMT - new_ts = timestamp - pegasus::utils::epoch_begin; + new_ts = value - pegasus::utils::epoch_begin; break; default: ddebug("invalid update ttl operation type"); diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index 713bef2c15..e387b44ae1 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -75,11 +75,12 @@ class delete_key : public compaction_operation enum update_ttl_op_type { - // update ttl to epoch_now() + timestamp + // update ttl to epoch_now() + value UTOT_FROM_NOW, - // update ttl to {current ttl in rocksdb value} + timestamp + // update ttl to {current ttl in rocksdb value} + value UTOT_FROM_CURRENT, - // update ttl to timestamp + // update ttl to value - pegasus::utils::epoch_begin, which means this key will expire at the + // timestamp of {value} UTOT_TIMESTAMP, UTOT_INVALID, }; @@ -97,7 +98,7 @@ class update_ttl : public compaction_operation private: update_ttl_op_type type; - uint32_t timestamp; + uint32_t value; FRIEND_TEST(update_ttl_test, filter); }; diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp index 5ef35e1b71..930387209e 100644 --- a/src/server/test/compaction_operation_test.cpp +++ b/src/server/test/compaction_operation_test.cpp @@ -158,7 +158,7 @@ TEST(update_ttl_test, filter) string_match_type hashkey_match_type; // operation update_ttl_op_type op_type; - uint32_t timestamp; + uint32_t value; } tests[] = { {true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_NOW, 1000}, {false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000}, @@ -190,7 +190,7 @@ TEST(update_ttl_test, filter) auto hash_rule = static_cast(update_operation.rules.begin()->get()); hash_rule->pattern = test.hashkey_pattern; hash_rule->match_type = test.hashkey_match_type; - update_operation.timestamp = test.timestamp; + update_operation.value = test.value; update_operation.type = test.op_type; std::string new_value; From 33880e08febb5115b95785e8e002399bb5a4f274 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 11 Jun 2021 09:10:34 +0800 Subject: [PATCH 4/4] update --- src/server/compaction_operation.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index e387b44ae1..d4e4285974 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -79,7 +79,7 @@ enum update_ttl_op_type UTOT_FROM_NOW, // update ttl to {current ttl in rocksdb value} + value UTOT_FROM_CURRENT, - // update ttl to value - pegasus::utils::epoch_begin, which means this key will expire at the + // update ttl to value - time(nullptr), which means this key will expire at the // timestamp of {value} UTOT_TIMESTAMP, UTOT_INVALID,