Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update ttl compaction operation #760

Merged
merged 9 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/server/compaction_filter_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class hashkey_pattern_rule : public compaction_filter_rule

FRIEND_TEST(hashkey_pattern_rule_test, match);
FRIEND_TEST(delete_key_test, filter);
FRIEND_TEST(update_ttl_test, filter);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};

class sortkey_pattern_rule : public compaction_filter_rule
Expand All @@ -83,6 +85,7 @@ class sortkey_pattern_rule : public compaction_filter_rule
string_match_type match_type;

FRIEND_TEST(sortkey_pattern_rule_test, match);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};

class ttl_range_rule : public compaction_filter_rule
Expand All @@ -101,6 +104,7 @@ class ttl_range_rule : public compaction_filter_rule
uint32_t pegasus_data_version;

FRIEND_TEST(ttl_range_rule_test, match);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};
} // namespace server
} // namespace pegasus
47 changes: 47 additions & 0 deletions src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/

#include "base/pegasus_utils.h"
#include "base/pegasus_value_schema.h"
#include "compaction_operation.h"

namespace pegasus {
Expand Down Expand Up @@ -55,5 +57,50 @@ bool delete_key::filter(const std::string &hash_key,
}
return true;
}

update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version)
: compaction_operation(std::move(rules), pegasus_data_version)
{
}

bool update_ttl::filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const
{
if (!all_rules_match(hash_key, sort_key, existing_value)) {
return false;
}

uint32_t new_ts = 0;
switch (type) {
case update_ttl_op_type::UTOT_FROM_NOW:
new_ts = utils::epoch_now() + timestamp;
break;
case update_ttl_op_type::UTOT_FROM_CURRENT: {
auto ttl =
pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value));
if (ttl == 0) {
return false;
}
new_ts = timestamp + ttl;
break;
}
case update_ttl_op_type::UTOT_TIMESTAMP:
// make it's seconds since 2016.01.01-00:00:00 GMT
new_ts = timestamp - pegasus::utils::epoch_begin;
break;
default:
ddebug("invalid update ttl operation type");
return false;
}

*new_value = existing_value.ToString();
pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts);
*value_changed = true;
return false;
}

} // namespace server
} // namespace pegasus
27 changes: 27 additions & 0 deletions src/server/compaction_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ class delete_key : public compaction_operation

private:
FRIEND_TEST(delete_key_test, filter);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};

enum update_ttl_op_type
{
UTOT_FROM_NOW,
UTOT_FROM_CURRENT,
UTOT_TIMESTAMP,
UTOT_INVALID,
};
hycdong marked this conversation as resolved.
Show resolved Hide resolved

class update_ttl : public compaction_operation
{
public:
update_ttl(filter_rules &&rules, uint32_t pegasus_data_version);

bool filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const;

private:
update_ttl_op_type type;
uint32_t timestamp;

FRIEND_TEST(update_ttl_test, filter);
};
} // namespace server
} // namespace pegasus
171 changes: 171 additions & 0 deletions src/server/test/compaction_operation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,104 @@
#include <gtest/gtest.h>
#include "server/compaction_operation.h"
#include "server/compaction_filter_rule.h"
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include <dsn/utility/smart_pointers.h>

namespace pegasus {
namespace server {

TEST(compaction_filter_operation_test, all_rules_match)
{
struct test_case
{
bool all_match;
std::string hashkey;
std::string sortkey;
int32_t expire_ttl;
// hashkey_rule
std::string hashkey_pattern;
string_match_type hashkey_match_type;
// sortkey_rule
std::string sortkey_pattern;
string_match_type sortkey_match_type;
// ttl_range_rule
int32_t start_ttl;
int32_t stop_ttl;
} tests[] = {
{true,
"hashkey",
"sortkey",
1000,
"hashkey",
SMT_MATCH_ANYWHERE,
"sortkey",
SMT_MATCH_ANYWHERE,
100,
2000},
{false,
"hash_key",
"sortkey",
1000,
"hashkey",
SMT_MATCH_ANYWHERE,
"sortkey",
SMT_MATCH_ANYWHERE,
100,
2000},
{false,
"hashkey",
"sort_key",
1000,
"hashkey",
SMT_MATCH_ANYWHERE,
"sortkey",
SMT_MATCH_ANYWHERE,
100,
2000},
{false,
"hashkey",
"sortkey",
10000,
"hashkey",
SMT_MATCH_ANYWHERE,
"sortkey",
SMT_MATCH_ANYWHERE,
100,
2000},
};

uint32_t data_version = 1;
filter_rules rules;
rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
rules.push_back(dsn::make_unique<sortkey_pattern_rule>());
rules.push_back(dsn::make_unique<ttl_range_rule>(data_version));
delete_key update_operation(std::move(rules), data_version);
pegasus_value_generator gen;
auto now_ts = utils::epoch_now();
for (const auto &test : tests) {
auto hash_rule = static_cast<hashkey_pattern_rule *>(update_operation.rules[0].get());
auto sort_rule = static_cast<sortkey_pattern_rule *>(update_operation.rules[1].get());
auto ttl_rule = static_cast<ttl_range_rule *>(update_operation.rules[2].get());

hash_rule->pattern = test.hashkey_pattern;
hash_rule->match_type = test.hashkey_match_type;
sort_rule->pattern = test.sortkey_pattern;
sort_rule->match_type = test.sortkey_match_type;
ttl_rule->start_ttl = test.start_ttl;
ttl_rule->stop_ttl = test.stop_ttl;

rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
test.all_match);
}

// all_rules_match will return false if there is no rule in this operation
update_ttl no_rule_operation({}, data_version);
ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false);
}

TEST(delete_key_test, filter)
{
struct test_case
Expand All @@ -51,5 +144,83 @@ TEST(delete_key_test, filter)
delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr));
}
}

TEST(update_ttl_test, filter)
{
struct test_case
{
bool value_changed;
uint32_t expect_ts;
std::string hashkey;
uint32_t expire_ts;
// hashkey_rule
std::string hashkey_pattern;
string_match_type hashkey_match_type;
// operation
update_ttl_op_type op_type;
uint32_t timestamp;
} tests[] = {
{true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_NOW, 1000},
{false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000},
{true, 1300, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, UTOT_FROM_CURRENT, 1000},
{true,
1000 + pegasus::utils::epoch_begin,
"hashkey",
300,
"hashkey",
SMT_MATCH_ANYWHERE,
UTOT_TIMESTAMP,
1000 + pegasus::utils::epoch_begin},
{false,
1000 + pegasus::utils::epoch_begin,
"hashkey",
300,
"hashkey111",
SMT_MATCH_ANYWHERE,
UTOT_TIMESTAMP,
1000 + pegasus::utils::epoch_begin},
};

uint32_t data_version = 1;
filter_rules rules;
rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
update_ttl update_operation(std::move(rules), data_version);
pegasus_value_generator gen;
for (const auto &test : tests) {
auto hash_rule = static_cast<hashkey_pattern_rule *>(update_operation.rules.begin()->get());
hash_rule->pattern = test.hashkey_pattern;
hash_rule->match_type = test.hashkey_match_type;
update_operation.timestamp = test.timestamp;
update_operation.type = test.op_type;

std::string new_value;
bool value_changed = false;
rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0);
uint32_t before_ts = utils::epoch_now();
ASSERT_EQ(
false,
update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed));
ASSERT_EQ(test.value_changed, value_changed);
if (value_changed) {
uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value);
switch (test.op_type) {
case UTOT_TIMESTAMP:
ASSERT_EQ(new_ts + pegasus::utils::epoch_begin, test.expect_ts);
break;
case UTOT_FROM_CURRENT:
ASSERT_EQ(new_ts, test.expect_ts);
break;
case UTOT_FROM_NOW: {
uint32_t after_ts = utils::epoch_now();
ASSERT_GE(new_ts, test.expect_ts + before_ts);
ASSERT_LE(new_ts, test.expect_ts + after_ts);
break;
}
default:
break;
}
}
}
}
} // namespace server
} // namespace pegasus