Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(split): gc useless data after partition split #698

Merged
merged 6 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
3 changes: 3 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold
/// time threshold of each rocksdb iteration
const std::string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");

/// true means compaction and scan will validate partition_hash, otherwise false
const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;

extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;

extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
} // namespace pegasus
25 changes: 19 additions & 6 deletions src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string &
}
}

// calculate hash from rocksdb key.
inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
// calculate hash from rocksdb key or rocksdb slice
template <typename T>
inline uint64_t pegasus_key_hash(const T &key)
{
dassert(key.length() >= 2, "key length must be no less than 2");
dassert(key.size() >= 2, "key length must be no less than 2");

// hash_key_len is in big endian
uint16_t hash_key_len = be16toh(*(int16_t *)(key.data()));

if (hash_key_len > 0) {
// hash_key_len > 0, compute hash from hash_key
dassert(key.length() >= 2 + hash_key_len,
dassert(key.size() >= 2 + hash_key_len,
"key length must be no less than (2 + hash_key_len)");
return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0);
return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0);
} else {
// hash_key_len == 0, compute hash from sort_key
return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2, 0);
return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0);
}
}

Expand All @@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key)
return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
}

// check key should be served this partition
// Notice: partition_version should be check if is greater than 0 before calling this function
template <class T>
inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t partition_version)
{
auto target_pidx = pegasus_key_hash(key) & partition_version;
if (dsn_unlikely(target_pidx != pidx)) {
return false;
}
return true;
}

} // namespace pegasus
55 changes: 50 additions & 5 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rocksdb/merge_operator.h>

#include "base/pegasus_utils.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"

namespace pegasus {
Expand All @@ -33,8 +34,18 @@ namespace server {
class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
{
public:
KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t default_ttl, bool enabled)
: _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled)
KeyWithTTLCompactionFilter(uint32_t pegasus_data_version,
uint32_t default_ttl,
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
{
}

Expand All @@ -58,16 +69,30 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
*value_changed = true;
return false;
}
return check_if_ts_expired(utils::epoch_now(), expire_ts);
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

// Check if the record is stale after partition split, which will split the partition into two
// halves. The stale record belongs to the other half.
bool check_if_stale_split_data(const rocksdb::Slice &key) const
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
if (!_validate_partition_hash || key.size() < 2 || _partition_version < 0 ||
_partition_index > _partition_version) {
return false;
}
return !check_pegasus_key_hash(key, _partition_index, _partition_version);
}

private:
uint32_t _pegasus_data_version;
uint32_t _default_ttl;
bool _enabled; // only process filtering when _enabled == true
mutable pegasus_value_generator _gen;
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
Expand All @@ -79,8 +104,13 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
return std::unique_ptr<KeyWithTTLCompactionFilter>(new KeyWithTTLCompactionFilter(
_pegasus_data_version.load(), _default_ttl.load(), _enabled.load()));
return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

Expand All @@ -90,11 +120,26 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
}
void EnableFilter() { _enabled.store(true, std::memory_order_release); }
void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); }
void SetValidatePartitionHash(bool validate_hash)
{
_validate_partition_hash.store(validate_hash, std::memory_order_release);
}
void SetPartitionIndex(int32_t pidx)
{
_partition_index.store(pidx, std::memory_order_release);
}
void SetPartitionVersion(int32_t partition_version)
{
_partition_version.store(partition_version, std::memory_order_release);
}

private:
std::atomic<uint32_t> _pegasus_data_version;
std::atomic<uint32_t> _default_ttl;
std::atomic_bool _enabled; // only process filtering when _enabled == true
std::atomic<int32_t> _partition_index{0};
std::atomic<int32_t> _partition_version{-1};
std::atomic_bool _validate_partition_hash{false};
};

} // namespace server
Expand Down
26 changes: 24 additions & 2 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)

// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
_key_ttl_compaction_filter_factory->EnableFilter();

parse_checkpoints();
Expand Down Expand Up @@ -2294,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand All @@ -2310,6 +2313,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand Down Expand Up @@ -2438,6 +2442,25 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold(
}
}

void pegasus_server_impl::update_validate_partition_hash(
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
const std::map<std::string, std::string> &envs)
{
bool new_value = false;
auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH);
if (iter != envs.end()) {
if (!dsn::buf2bool(iter->second, new_value)) {
derror_replica("{}={} is invalid.", iter->first, iter->second);
return;
}
}
if (new_value != _validate_partition_hash) {
ddebug_replica(
"update '_validate_partition_hash' from {} to {}", _validate_partition_hash, new_value);
_validate_partition_hash = new_value;
_key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash);
}
}

bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
Expand Down Expand Up @@ -2750,8 +2773,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
int32_t old_partition_version = _partition_version.exchange(partition_version);
ddebug_replica(
"update partition version from {} to {}", old_partition_version, partition_version);

// TODO(heyuchen): set filter _partition_version in further pr
_key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version);
}

::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
Expand Down
3 changes: 3 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class pegasus_server_impl : public pegasus_read_service

void update_rocksdb_iteration_threshold(const std::map<std::string, std::string> &envs);

void update_validate_partition_hash(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down Expand Up @@ -402,6 +404,7 @@ class pegasus_server_impl : public pegasus_read_service
pegasus_manual_compact_service _manual_compact_svc;

std::atomic<int32_t> _partition_version;
bool _validate_partition_hash{false};

dsn::replication::ingestion_status::type _ingestion_status{
dsn::replication::ingestion_status::IS_INVALID};
Expand Down