Skip to content

Commit

Permalink
cluster: add topic config for fast partition movement
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Oct 13, 2023
1 parent e63a165 commit 0365e8e
Show file tree
Hide file tree
Showing 26 changed files with 323 additions and 24 deletions.
10 changes: 10 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ std::chrono::milliseconds
metadata_cache::get_default_retention_local_target_ms() const {
return config::shard_local_cfg().retention_local_target_ms_default();
}
std::optional<size_t>
metadata_cache::get_default_initial_retention_local_target_bytes() const {
return config::shard_local_cfg()
.initial_retention_local_target_bytes_default();
}
std::chrono::milliseconds
metadata_cache::get_default_initial_retention_local_target_ms() const {
return config::shard_local_cfg()
.initial_retention_local_target_ms_default();
}

uint32_t metadata_cache::get_default_batch_max_bytes() const {
return config::shard_local_cfg().kafka_batch_max_bytes();
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class metadata_cache {
get_default_retention_duration() const;
std::optional<size_t> get_default_retention_local_target_bytes() const;
std::chrono::milliseconds get_default_retention_local_target_ms() const;
std::optional<size_t>
get_default_initial_retention_local_target_bytes() const;
std::chrono::milliseconds
get_default_initial_retention_local_target_ms() const;
model::shadow_indexing_mode get_default_shadow_indexing_mode() const;
uint32_t get_default_batch_max_bytes() const;
std::optional<std::chrono::milliseconds> get_default_segment_ms() const;
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,20 @@ ss::future<> topic_recovery_service::reset_topic_configurations() {
= tristate<size_t>{
config::shard_local_cfg().retention_local_target_bytes_default()};

update.properties.initial_retention_local_target_ms.op
= cluster::incremental_update_operation::set;
update.properties.initial_retention_local_target_ms.value
= tristate<std::chrono::milliseconds>{
config::shard_local_cfg()
.initial_retention_local_target_ms_default()};

update.properties.initial_retention_local_target_bytes.op
= cluster::incremental_update_operation::set;
update.properties.initial_retention_local_target_bytes.value
= tristate<size_t>{
config::shard_local_cfg()
.initial_retention_local_target_bytes_default()};

vlog(
cst_log.debug,
"resetting topic properties for {} using update: {}",
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,12 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(
properties.record_value_subject_name_strategy_compat,
overrides.record_value_subject_name_strategy_compat);
incremental_update(
properties.initial_retention_local_target_bytes,
overrides.initial_retention_local_target_bytes);
incremental_update(
properties.initial_retention_local_target_ms,
overrides.initial_retention_local_target_ms);
// no configuration change, no need to generate delta
if (properties == properties_snapshot) {
co_return errc::success;
Expand Down
46 changes: 39 additions & 7 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ bool topic_properties::has_overrides() const {
|| record_value_schema_id_validation.has_value()
|| record_value_schema_id_validation_compat.has_value()
|| record_value_subject_name_strategy.has_value()
|| record_value_subject_name_strategy_compat.has_value();
|| record_value_subject_name_strategy_compat.has_value()
|| initial_retention_local_target_bytes.is_engaged()
|| initial_retention_local_target_ms.is_engaged();
}

bool topic_properties::requires_remote_erase() const {
Expand All @@ -219,6 +221,9 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.retention_local_target_ms = retention_local_target_ms;
ret.remote_delete = remote_delete;
ret.segment_ms = segment_ms;
ret.initial_retention_local_target_bytes
= initial_retention_local_target_bytes;
ret.initial_retention_local_target_ms = initial_retention_local_target_ms;
return ret;
}

Expand Down Expand Up @@ -250,6 +255,10 @@ storage::ntp_config topic_configuration::make_ntp_config(
.retention_local_target_ms = properties.retention_local_target_ms,
.remote_delete = properties.remote_delete,
.segment_ms = properties.segment_ms,
.initial_retention_local_target_bytes
= properties.initial_retention_local_target_bytes,
.initial_retention_local_target_ms
= properties.initial_retention_local_target_ms,
});
}
return {
Expand Down Expand Up @@ -366,7 +375,9 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"record_value_schema_id_validation: {}, "
"record_value_schema_id_validation_compat: {}, "
"record_value_subject_name_strategy: {}, "
"record_value_subject_name_strategy_compat: {}}}",
"record_value_subject_name_strategy_compat: {}, "
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}}}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand All @@ -391,7 +402,9 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.record_value_schema_id_validation,
properties.record_value_schema_id_validation_compat,
properties.record_value_subject_name_strategy,
properties.record_value_subject_name_strategy_compat);
properties.record_value_subject_name_strategy_compat,
properties.initial_retention_local_target_bytes,
properties.initial_retention_local_target_ms);

return o;
}
Expand Down Expand Up @@ -655,7 +668,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"record_value_schema_id_validation: {}"
"record_value_schema_id_validation_compat: {}"
"record_value_subject_name_strategy: {}"
"record_value_subject_name_strategy_compat: {}",
"record_value_subject_name_strategy_compat: {}, "
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand All @@ -676,7 +691,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.record_value_schema_id_validation,
i.record_value_schema_id_validation_compat,
i.record_value_subject_name_strategy,
i.record_value_subject_name_strategy_compat);
i.record_value_subject_name_strategy_compat,
i.initial_retention_local_target_bytes,
i.initial_retention_local_target_ms);
return o;
}

Expand Down Expand Up @@ -1670,7 +1687,9 @@ void adl<cluster::incremental_topic_updates>::to(
t.record_value_schema_id_validation,
t.record_value_schema_id_validation_compat,
t.record_value_subject_name_strategy,
t.record_value_subject_name_strategy_compat);
t.record_value_subject_name_strategy_compat,
t.initial_retention_local_target_bytes,
t.initial_retention_local_target_ms);
}

cluster::incremental_topic_updates
Expand Down Expand Up @@ -1782,6 +1801,16 @@ adl<cluster::incremental_topic_updates>::from(iobuf_parser& in) {
.from(in);
}

if (
version
<= cluster::incremental_topic_updates::version_with_initial_retention) {
updates.initial_retention_local_target_bytes
= adl<cluster::property_update<tristate<size_t>>>{}.from(in);
updates.initial_retention_local_target_ms
= adl<cluster::property_update<tristate<std::chrono::milliseconds>>>{}
.from(in);
}

return updates;
}

Expand Down Expand Up @@ -1972,6 +2001,7 @@ adl<cluster::remote_topic_properties>::from(iobuf_parser& parser) {
return {remote_revision, remote_partition_count};
}

// adl is no longer used for serializing new topic properties
void adl<cluster::topic_properties>::to(
iobuf& out, cluster::topic_properties&& p) {
reflection::serialize(
Expand Down Expand Up @@ -2044,7 +2074,9 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt};
std::nullopt,
tristate<size_t>{std::nullopt},
tristate<std::chrono::milliseconds>{std::nullopt}};
}

void adl<cluster::cluster_property_kv>::to(
Expand Down
32 changes: 25 additions & 7 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ struct remote_topic_properties
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<5>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<6>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -1442,7 +1442,9 @@ struct topic_properties
std::optional<pandaproxy::schema_registry::subject_name_strategy>
record_value_subject_name_strategy,
std::optional<pandaproxy::schema_registry::subject_name_strategy>
record_value_subject_name_strategy_compat)
record_value_subject_name_strategy_compat,
tristate<size_t> initial_retention_local_target_bytes,
tristate<std::chrono::milliseconds> initial_retention_local_target_ms)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -1471,7 +1473,10 @@ struct topic_properties
record_value_schema_id_validation_compat)
, record_value_subject_name_strategy(record_value_subject_name_strategy)
, record_value_subject_name_strategy_compat(
record_value_subject_name_strategy_compat) {}
record_value_subject_name_strategy_compat)
, initial_retention_local_target_bytes(
initial_retention_local_target_bytes)
, initial_retention_local_target_ms(initial_retention_local_target_ms) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -1510,6 +1515,10 @@ struct topic_properties
std::optional<pandaproxy::schema_registry::subject_name_strategy>
record_value_subject_name_strategy_compat;

tristate<size_t> initial_retention_local_target_bytes{std::nullopt};
tristate<std::chrono::milliseconds> initial_retention_local_target_ms{
std::nullopt};

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -1543,7 +1552,9 @@ struct topic_properties
record_value_schema_id_validation,
record_value_schema_id_validation_compat,
record_value_subject_name_strategy,
record_value_subject_name_strategy_compat);
record_value_subject_name_strategy_compat,
initial_retention_local_target_bytes,
initial_retention_local_target_ms);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down Expand Up @@ -1631,21 +1642,23 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<4>,
serde::version<5>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
static constexpr int8_t version_with_batch_max_bytes_and_local_retention
= -4;
static constexpr int8_t version_with_segment_ms = -5;
static constexpr int8_t version_with_schema_id_validation = -6;
static constexpr int8_t version_with_initial_retention = -7;
// negative version indicating different format:
// -1 - topic_updates with data_policy
// -2 - topic_updates without data_policy
// -3 - topic_updates with shadow_indexing
// -4 - topic update with batch_max_bytes and retention.local.target
// -6 - topic updates with schema id validation
static constexpr int8_t version = version_with_schema_id_validation;
// -7 - topic updates with initial retention
static constexpr int8_t version = version_with_initial_retention;
property_update<std::optional<model::compression>> compression;
property_update<std::optional<model::cleanup_policy_bitflags>>
cleanup_policy_bitflags;
Expand Down Expand Up @@ -1680,6 +1693,9 @@ struct incremental_topic_updates
property_update<
std::optional<pandaproxy::schema_registry::subject_name_strategy>>
record_value_subject_name_strategy_compat;
property_update<tristate<size_t>> initial_retention_local_target_bytes;
property_update<tristate<std::chrono::milliseconds>>
initial_retention_local_target_ms;

auto serde_fields() {
return std::tie(
Expand All @@ -1703,7 +1719,9 @@ struct incremental_topic_updates
record_value_schema_id_validation,
record_value_schema_id_validation_compat,
record_value_subject_name_strategy,
record_value_subject_name_strategy_compat);
record_value_subject_name_strategy_compat,
initial_retention_local_target_bytes,
initial_retention_local_target_ms);
}

friend std::ostream&
Expand Down
22 changes: 22 additions & 0 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ struct compat_check<cluster::topic_properties> {
json_write(record_value_schema_id_validation_compat);
json_write(record_value_subject_name_strategy);
json_write(record_value_subject_name_strategy_compat);
json_write(initial_retention_local_target_bytes);
json_write(initial_retention_local_target_ms);
}

static cluster::topic_properties from_json(json::Value& rd) {
Expand Down Expand Up @@ -377,6 +379,8 @@ struct compat_check<cluster::topic_properties> {
json_read(record_value_schema_id_validation_compat);
json_read(record_value_subject_name_strategy);
json_read(record_value_subject_name_strategy_compat);
json_read(initial_retention_local_target_bytes);
json_read(initial_retention_local_target_ms);
return obj;
}

Expand All @@ -399,6 +403,10 @@ struct compat_check<cluster::topic_properties> {
std::nullopt};

obj.segment_ms = tristate<std::chrono::milliseconds>{std::nullopt};
obj.initial_retention_local_target_bytes = tristate<size_t>{
std::nullopt};
obj.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};

if (reply != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -469,6 +477,11 @@ struct compat_check<cluster::topic_configuration> {
obj.properties.segment_ms = tristate<std::chrono::milliseconds>{
std::nullopt};

obj.properties.initial_retention_local_target_bytes = tristate<size_t>{
std::nullopt};
obj.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};

if (cfg != obj) {
throw compat_error(fmt::format(
"Verify of {{cluster::topic_property}} decoding "
Expand Down Expand Up @@ -526,6 +539,11 @@ struct compat_check<cluster::create_topics_request> {

topic.properties.segment_ms = tristate<std::chrono::milliseconds>{
std::nullopt};

topic.properties.initial_retention_local_target_bytes
= tristate<size_t>{std::nullopt};
topic.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
}
if (req != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -584,6 +602,10 @@ struct compat_check<cluster::create_topics_reply> {
= tristate<std::chrono::milliseconds>{std::nullopt};
topic.properties.segment_ms = tristate<std::chrono::milliseconds>{
std::nullopt};
topic.properties.initial_retention_local_target_bytes
= tristate<size_t>{std::nullopt};
topic.properties.initial_retention_local_target_ms
= tristate<std::chrono::milliseconds>{std::nullopt};
}
if (reply != obj) {
throw compat_error(fmt::format(
Expand Down
5 changes: 4 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,10 @@ struct instance_generator<cluster::topic_properties> {
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt};
std::nullopt,
tests::random_tristate(
[] { return random_generators::get_int<size_t>(); }),
tests::random_tristate([] { return tests::random_duration_ms(); })};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
16 changes: 16 additions & 0 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,14 @@ inline void rjson_serialize(
w,
"record_value_subject_name_strategy_compat",
tps.record_value_subject_name_strategy_compat);
write_member(
w,
"initial_retention_local_target_bytes",
tps.initial_retention_local_target_bytes);
write_member(
w,
"initial_retention_local_target_ms",
tps.initial_retention_local_target_ms);
w.EndObject();
}

Expand Down Expand Up @@ -650,6 +658,14 @@ inline void read_value(json::Value const& rd, cluster::topic_properties& obj) {
rd,
"record_value_subject_name_strategy_compat",
obj.record_value_subject_name_strategy_compat);
read_member(
rd,
"initial_retention_local_target_bytes",
obj.initial_retention_local_target_bytes);
read_member(
rd,
"initial_retention_local_target_ms",
obj.initial_retention_local_target_ms);
}

inline void rjson_serialize(
Expand Down
Loading

0 comments on commit 0365e8e

Please sign in to comment.