diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h index 8927839632..bc61039e83 100644 --- a/src/server/compaction_filter_rule.h +++ b/src/server/compaction_filter_rule.h @@ -67,6 +67,8 @@ class hashkey_pattern_rule : public compaction_filter_rule FRIEND_TEST(hashkey_pattern_rule_test, match); FRIEND_TEST(delete_key_test, filter); + FRIEND_TEST(update_ttl_test, filter); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; class sortkey_pattern_rule : public compaction_filter_rule @@ -83,6 +85,7 @@ class sortkey_pattern_rule : public compaction_filter_rule string_match_type match_type; FRIEND_TEST(sortkey_pattern_rule_test, match); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; class ttl_range_rule : public compaction_filter_rule @@ -101,6 +104,7 @@ class ttl_range_rule : public compaction_filter_rule uint32_t pegasus_data_version; FRIEND_TEST(ttl_range_rule_test, match); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); }; } // namespace server } // namespace pegasus diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp index bb86e0c4da..895c75dc5a 100644 --- a/src/server/compaction_operation.cpp +++ b/src/server/compaction_operation.cpp @@ -17,6 +17,8 @@ * under the License. */ +#include "base/pegasus_utils.h" +#include "base/pegasus_value_schema.h" #include "compaction_operation.h" namespace pegasus { @@ -55,5 +57,50 @@ bool delete_key::filter(const std::string &hash_key, } return true; } + +update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version) + : compaction_operation(std::move(rules), pegasus_data_version) +{ +} + +bool update_ttl::filter(const std::string &hash_key, + const std::string &sort_key, + const rocksdb::Slice &existing_value, + std::string *new_value, + bool *value_changed) const +{ + if (!all_rules_match(hash_key, sort_key, existing_value)) { + return false; + } + + uint32_t new_ts = 0; + switch (type) { + case update_ttl_op_type::UTOT_FROM_NOW: + new_ts = utils::epoch_now() + value; + break; + case update_ttl_op_type::UTOT_FROM_CURRENT: { + auto ttl = + pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value)); + if (ttl == 0) { + return false; + } + new_ts = value + ttl; + break; + } + case update_ttl_op_type::UTOT_TIMESTAMP: + // make it's seconds since 2016.01.01-00:00:00 GMT + new_ts = value - pegasus::utils::epoch_begin; + break; + default: + ddebug("invalid update ttl operation type"); + return false; + } + + *new_value = existing_value.ToString(); + pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts); + *value_changed = true; + return false; +} + } // namespace server } // namespace pegasus diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h index a4e630527c..d4e4285974 100644 --- a/src/server/compaction_operation.h +++ b/src/server/compaction_operation.h @@ -70,6 +70,37 @@ class delete_key : public compaction_operation private: FRIEND_TEST(delete_key_test, filter); + FRIEND_TEST(compaction_filter_operation_test, all_rules_match); +}; + +enum update_ttl_op_type +{ + // update ttl to epoch_now() + value + UTOT_FROM_NOW, + // update ttl to {current ttl in rocksdb value} + value + UTOT_FROM_CURRENT, + // update ttl to value - time(nullptr), which means this key will expire at the + // timestamp of {value} + UTOT_TIMESTAMP, + UTOT_INVALID, +}; + +class update_ttl : public compaction_operation +{ +public: + update_ttl(filter_rules &&rules, uint32_t pegasus_data_version); + + bool filter(const std::string &hash_key, + const std::string &sort_key, + const rocksdb::Slice &existing_value, + std::string *new_value, + bool *value_changed) const; + +private: + update_ttl_op_type type; + uint32_t value; + + FRIEND_TEST(update_ttl_test, filter); }; } // namespace server } // namespace pegasus diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp index b7140aa609..930387209e 100644 --- a/src/server/test/compaction_operation_test.cpp +++ b/src/server/test/compaction_operation_test.cpp @@ -20,11 +20,104 @@ #include #include "server/compaction_operation.h" #include "server/compaction_filter_rule.h" +#include "base/pegasus_value_schema.h" +#include "base/pegasus_utils.h" #include namespace pegasus { namespace server { +TEST(compaction_filter_operation_test, all_rules_match) +{ + struct test_case + { + bool all_match; + std::string hashkey; + std::string sortkey; + int32_t expire_ttl; + // hashkey_rule + std::string hashkey_pattern; + string_match_type hashkey_match_type; + // sortkey_rule + std::string sortkey_pattern; + string_match_type sortkey_match_type; + // ttl_range_rule + int32_t start_ttl; + int32_t stop_ttl; + } tests[] = { + {true, + "hashkey", + "sortkey", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hash_key", + "sortkey", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hashkey", + "sort_key", + 1000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + {false, + "hashkey", + "sortkey", + 10000, + "hashkey", + SMT_MATCH_ANYWHERE, + "sortkey", + SMT_MATCH_ANYWHERE, + 100, + 2000}, + }; + + uint32_t data_version = 1; + filter_rules rules; + rules.push_back(dsn::make_unique()); + rules.push_back(dsn::make_unique()); + rules.push_back(dsn::make_unique(data_version)); + delete_key update_operation(std::move(rules), data_version); + pegasus_value_generator gen; + auto now_ts = utils::epoch_now(); + for (const auto &test : tests) { + auto hash_rule = static_cast(update_operation.rules[0].get()); + auto sort_rule = static_cast(update_operation.rules[1].get()); + auto ttl_rule = static_cast(update_operation.rules[2].get()); + + hash_rule->pattern = test.hashkey_pattern; + hash_rule->match_type = test.hashkey_match_type; + sort_rule->pattern = test.sortkey_pattern; + sort_rule->match_type = test.sortkey_match_type; + ttl_rule->start_ttl = test.start_ttl; + ttl_rule->stop_ttl = test.stop_ttl; + + rocksdb::SliceParts svalue = + gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0); + ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]), + test.all_match); + } + + // all_rules_match will return false if there is no rule in this operation + update_ttl no_rule_operation({}, data_version); + ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false); +} + TEST(delete_key_test, filter) { struct test_case @@ -51,5 +144,83 @@ TEST(delete_key_test, filter) delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr)); } } + +TEST(update_ttl_test, filter) +{ + struct test_case + { + bool value_changed; + uint32_t expect_ts; + std::string hashkey; + uint32_t expire_ts; + // hashkey_rule + std::string hashkey_pattern; + string_match_type hashkey_match_type; + // operation + update_ttl_op_type op_type; + uint32_t value; + } tests[] = { + {true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_NOW, 1000}, + {false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000}, + {true, 1300, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000}, + {true, + 1000 + pegasus::utils::epoch_begin, + "hashkey", + 300, + "hashkey", + SMT_MATCH_ANYWHERE, + UTOT_TIMESTAMP, + 1000 + pegasus::utils::epoch_begin}, + {false, + 1000 + pegasus::utils::epoch_begin, + "hashkey", + 300, + "hashkey111", + SMT_MATCH_ANYWHERE, + UTOT_TIMESTAMP, + 1000 + pegasus::utils::epoch_begin}, + }; + + uint32_t data_version = 1; + filter_rules rules; + rules.push_back(dsn::make_unique()); + update_ttl update_operation(std::move(rules), data_version); + pegasus_value_generator gen; + for (const auto &test : tests) { + auto hash_rule = static_cast(update_operation.rules.begin()->get()); + hash_rule->pattern = test.hashkey_pattern; + hash_rule->match_type = test.hashkey_match_type; + update_operation.value = test.value; + update_operation.type = test.op_type; + + std::string new_value; + bool value_changed = false; + rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0); + uint32_t before_ts = utils::epoch_now(); + ASSERT_EQ( + false, + update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed)); + ASSERT_EQ(test.value_changed, value_changed); + if (value_changed) { + uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value); + switch (test.op_type) { + case UTOT_TIMESTAMP: + ASSERT_EQ(new_ts + pegasus::utils::epoch_begin, test.expect_ts); + break; + case UTOT_FROM_CURRENT: + ASSERT_EQ(new_ts, test.expect_ts); + break; + case UTOT_FROM_NOW: { + uint32_t after_ts = utils::epoch_now(); + ASSERT_GE(new_ts, test.expect_ts + before_ts); + ASSERT_LE(new_ts, test.expect_ts + after_ts); + break; + } + default: + break; + } + } + } +} } // namespace server } // namespace pegasus