Skip to content

Commit

Permalink
feat(split): gc useless data after partition split (#698)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Mar 23, 2021
1 parent 83e18ae commit adcc62b
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 14 deletions.
2 changes: 1 addition & 1 deletion rdsn
3 changes: 3 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold
/// time threshold of each rocksdb iteration
const std::string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");

/// true means compaction and scan will validate partition_hash, otherwise false
const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;

extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;

extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
} // namespace pegasus
25 changes: 19 additions & 6 deletions src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string &
}
}

// calculate hash from rocksdb key.
inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
// calculate hash from rocksdb key or rocksdb slice
template <typename T>
inline uint64_t pegasus_key_hash(const T &key)
{
dassert(key.length() >= 2, "key length must be no less than 2");
dassert(key.size() >= 2, "key length must be no less than 2");

// hash_key_len is in big endian
uint16_t hash_key_len = be16toh(*(int16_t *)(key.data()));

if (hash_key_len > 0) {
// hash_key_len > 0, compute hash from hash_key
dassert(key.length() >= 2 + hash_key_len,
dassert(key.size() >= 2 + hash_key_len,
"key length must be no less than (2 + hash_key_len)");
return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0);
return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0);
} else {
// hash_key_len == 0, compute hash from sort_key
return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2, 0);
return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0);
}
}

Expand All @@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key)
return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
}

// check key should be served this partition
// Notice: partition_version should be check if is greater than 0 before calling this function
template <class T>
inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t partition_version)
{
auto target_pidx = pegasus_key_hash(key) & partition_version;
if (dsn_unlikely(target_pidx != pidx)) {
return false;
}
return true;
}

} // namespace pegasus
55 changes: 50 additions & 5 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rocksdb/merge_operator.h>

#include "base/pegasus_utils.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"

namespace pegasus {
Expand All @@ -33,8 +34,18 @@ namespace server {
class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
{
public:
KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t default_ttl, bool enabled)
: _pegasus_data_version(pegasus_data_version), _default_ttl(default_ttl), _enabled(enabled)
KeyWithTTLCompactionFilter(uint32_t pegasus_data_version,
uint32_t default_ttl,
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
{
}

Expand All @@ -58,16 +69,30 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
*value_changed = true;
return false;
}
return check_if_ts_expired(utils::epoch_now(), expire_ts);
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

// Check if the record is stale after partition split, which will split the partition into two
// halves. The stale record belongs to the other half.
bool check_if_stale_split_data(const rocksdb::Slice &key) const
{
if (!_validate_partition_hash || key.size() < 2 || _partition_version < 0 ||
_partition_index > _partition_version) {
return false;
}
return !check_pegasus_key_hash(key, _partition_index, _partition_version);
}

private:
uint32_t _pegasus_data_version;
uint32_t _default_ttl;
bool _enabled; // only process filtering when _enabled == true
mutable pegasus_value_generator _gen;
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
Expand All @@ -79,8 +104,13 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
return std::unique_ptr<KeyWithTTLCompactionFilter>(new KeyWithTTLCompactionFilter(
_pegasus_data_version.load(), _default_ttl.load(), _enabled.load()));
return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

Expand All @@ -90,11 +120,26 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
}
void EnableFilter() { _enabled.store(true, std::memory_order_release); }
void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); }
void SetValidatePartitionHash(bool validate_hash)
{
_validate_partition_hash.store(validate_hash, std::memory_order_release);
}
void SetPartitionIndex(int32_t pidx)
{
_partition_index.store(pidx, std::memory_order_release);
}
void SetPartitionVersion(int32_t partition_version)
{
_partition_version.store(partition_version, std::memory_order_release);
}

private:
std::atomic<uint32_t> _pegasus_data_version;
std::atomic<uint32_t> _default_ttl;
std::atomic_bool _enabled; // only process filtering when _enabled == true
std::atomic<int32_t> _partition_index{0};
std::atomic<int32_t> _partition_version{-1};
std::atomic_bool _validate_partition_hash{false};
};

} // namespace server
Expand Down
26 changes: 24 additions & 2 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)

// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
_key_ttl_compaction_filter_factory->EnableFilter();

parse_checkpoints();
Expand Down Expand Up @@ -2294,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand All @@ -2310,6 +2313,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand Down Expand Up @@ -2438,6 +2442,25 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold(
}
}

void pegasus_server_impl::update_validate_partition_hash(
const std::map<std::string, std::string> &envs)
{
bool new_value = false;
auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH);
if (iter != envs.end()) {
if (!dsn::buf2bool(iter->second, new_value)) {
derror_replica("{}={} is invalid.", iter->first, iter->second);
return;
}
}
if (new_value != _validate_partition_hash) {
ddebug_replica(
"update '_validate_partition_hash' from {} to {}", _validate_partition_hash, new_value);
_validate_partition_hash = new_value;
_key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash);
}
}

bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
Expand Down Expand Up @@ -2750,8 +2773,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
int32_t old_partition_version = _partition_version.exchange(partition_version);
ddebug_replica(
"update partition version from {} to {}", old_partition_version, partition_version);

// TODO(heyuchen): set filter _partition_version in further pr
_key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version);
}

::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
Expand Down
3 changes: 3 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class pegasus_server_impl : public pegasus_read_service

void update_rocksdb_iteration_threshold(const std::map<std::string, std::string> &envs);

void update_validate_partition_hash(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down Expand Up @@ -402,6 +404,7 @@ class pegasus_server_impl : public pegasus_read_service
pegasus_manual_compact_service _manual_compact_svc;

std::atomic<int32_t> _partition_version;
bool _validate_partition_hash{false};

dsn::replication::ingestion_status::type _ingestion_status{
dsn::replication::ingestion_status::IS_INVALID};
Expand Down

0 comments on commit adcc62b

Please sign in to comment.