Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start compaction operation in CompactionFilter::Filter #780

Merged
merged 9 commits into from
Jul 20, 2021
2 changes: 1 addition & 1 deletion src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const std::string &json, uint
enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
if (operation != nullptr) {
operation->set_rules(std::move(rules));
res.emplace_back(std::unique_ptr<compaction_operation>(operation));
res.emplace_back(std::shared_ptr<compaction_operation>(operation));
}
}
return res;
Expand Down
2 changes: 1 addition & 1 deletion src/server/compaction_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class update_ttl : public compaction_operation
FRIEND_TEST(compaction_filter_operation_test, create_operations);
};

typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
void register_compaction_operations();
} // namespace server
Expand Down
43 changes: 40 additions & 3 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
bool validate_hash,
compaction_operations &&compaction_ops)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
_validate_partition_hash(validate_hash),
_user_specified_operations(std::move(compaction_ops))
{
}

Expand All @@ -60,6 +62,17 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return false;
}

// ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need
// deal with it here.
if (key.size() < 2) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

if (!_user_specified_operations.empty() &&
user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
return true;
}

uint32_t expire_ts =
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
Expand All @@ -73,6 +86,22 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

bool user_specified_operation_filter(const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const
{
std::string hash_key, sort_key;
pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
for (const auto &op : _user_specified_operations) {
if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
// return true if this data need to be deleted
return true;
}
}
return false;
}

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

// Check if the record is stale after partition split, which will split the partition into two
Expand All @@ -94,6 +123,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
compaction_operations _user_specified_operations;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
Expand All @@ -105,13 +135,20 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
compaction_operations tmp_filter_operations;
{
dsn::utils::auto_read_lock l(_lock);
tmp_filter_operations = _user_specified_operations;
}

return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
_validate_partition_hash.load(),
std::move(tmp_filter_operations)));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

Expand Down