diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 64dec29348ec..e6f270237f26 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -272,6 +272,16 @@ std::chrono::milliseconds metadata_cache::get_default_retention_local_target_ms() const { return config::shard_local_cfg().retention_local_target_ms_default(); } +std::optional +metadata_cache::get_default_initial_retention_local_target_bytes() const { + return config::shard_local_cfg() + .initial_retention_local_target_bytes_default(); +} +std::chrono::milliseconds +metadata_cache::get_default_initial_retention_local_target_ms() const { + return config::shard_local_cfg() + .initial_retention_local_target_ms_default(); +} uint32_t metadata_cache::get_default_batch_max_bytes() const { return config::shard_local_cfg().kafka_batch_max_bytes(); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 135e90624657..a2fc2727e55a 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -178,6 +178,10 @@ class metadata_cache { get_default_retention_duration() const; std::optional get_default_retention_local_target_bytes() const; std::chrono::milliseconds get_default_retention_local_target_ms() const; + std::optional + get_default_initial_retention_local_target_bytes() const; + std::chrono::milliseconds + get_default_initial_retention_local_target_ms() const; model::shadow_indexing_mode get_default_shadow_indexing_mode() const; uint32_t get_default_batch_max_bytes() const; std::optional get_default_segment_ms() const; diff --git a/src/v/cluster/topic_recovery_service.cc b/src/v/cluster/topic_recovery_service.cc index 704f38b0a945..224ad8538d64 100644 --- a/src/v/cluster/topic_recovery_service.cc +++ b/src/v/cluster/topic_recovery_service.cc @@ -553,6 +553,20 @@ ss::future<> topic_recovery_service::reset_topic_configurations() { = tristate{ config::shard_local_cfg().retention_local_target_bytes_default()}; + update.properties.initial_retention_local_target_ms.op + = cluster::incremental_update_operation::set; + update.properties.initial_retention_local_target_ms.value + = tristate{ + config::shard_local_cfg() + .initial_retention_local_target_ms_default()}; + + update.properties.initial_retention_local_target_bytes.op + = cluster::incremental_update_operation::set; + update.properties.initial_retention_local_target_bytes.value + = tristate{ + config::shard_local_cfg() + .initial_retention_local_target_bytes_default()}; + vlog( cst_log.debug, "resetting topic properties for {} using update: {}", diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 1a1f9798daaf..0114a76c7b1f 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -741,6 +741,12 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { incremental_update( properties.record_value_subject_name_strategy_compat, overrides.record_value_subject_name_strategy_compat); + incremental_update( + properties.initial_retention_local_target_bytes, + overrides.initial_retention_local_target_bytes); + incremental_update( + properties.initial_retention_local_target_ms, + overrides.initial_retention_local_target_ms); // no configuration change, no need to generate delta if (properties == properties_snapshot) { co_return errc::success; diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 300a4c5e0152..d91c4e465b22 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -192,7 +192,9 @@ bool topic_properties::has_overrides() const { || record_value_schema_id_validation.has_value() || record_value_schema_id_validation_compat.has_value() || record_value_subject_name_strategy.has_value() - || record_value_subject_name_strategy_compat.has_value(); + || record_value_subject_name_strategy_compat.has_value() + || initial_retention_local_target_bytes.is_engaged() + || initial_retention_local_target_ms.is_engaged(); } bool topic_properties::requires_remote_erase() const { @@ -219,6 +221,9 @@ topic_properties::get_ntp_cfg_overrides() const { ret.retention_local_target_ms = retention_local_target_ms; ret.remote_delete = remote_delete; ret.segment_ms = segment_ms; + ret.initial_retention_local_target_bytes + = initial_retention_local_target_bytes; + ret.initial_retention_local_target_ms = initial_retention_local_target_ms; return ret; } @@ -250,6 +255,10 @@ storage::ntp_config topic_configuration::make_ntp_config( .retention_local_target_ms = properties.retention_local_target_ms, .remote_delete = properties.remote_delete, .segment_ms = properties.segment_ms, + .initial_retention_local_target_bytes + = properties.initial_retention_local_target_bytes, + .initial_retention_local_target_ms + = properties.initial_retention_local_target_ms, }); } return { @@ -366,7 +375,9 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "record_value_schema_id_validation: {}, " "record_value_schema_id_validation_compat: {}, " "record_value_subject_name_strategy: {}, " - "record_value_subject_name_strategy_compat: {}}}", + "record_value_subject_name_strategy_compat: {}, " + "initial_retention_local_target_bytes: {}, " + "initial_retention_local_target_ms: {}}}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -391,7 +402,9 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.record_value_schema_id_validation, properties.record_value_schema_id_validation_compat, properties.record_value_subject_name_strategy, - properties.record_value_subject_name_strategy_compat); + properties.record_value_subject_name_strategy_compat, + properties.initial_retention_local_target_bytes, + properties.initial_retention_local_target_ms); return o; } @@ -655,7 +668,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "record_value_schema_id_validation: {}" "record_value_schema_id_validation_compat: {}" "record_value_subject_name_strategy: {}" - "record_value_subject_name_strategy_compat: {}", + "record_value_subject_name_strategy_compat: {}, " + "initial_retention_local_target_bytes: {}, " + "initial_retention_local_target_ms: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -676,7 +691,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.record_value_schema_id_validation, i.record_value_schema_id_validation_compat, i.record_value_subject_name_strategy, - i.record_value_subject_name_strategy_compat); + i.record_value_subject_name_strategy_compat, + i.initial_retention_local_target_bytes, + i.initial_retention_local_target_ms); return o; } @@ -1670,7 +1687,9 @@ void adl::to( t.record_value_schema_id_validation, t.record_value_schema_id_validation_compat, t.record_value_subject_name_strategy, - t.record_value_subject_name_strategy_compat); + t.record_value_subject_name_strategy_compat, + t.initial_retention_local_target_bytes, + t.initial_retention_local_target_ms); } cluster::incremental_topic_updates @@ -1782,6 +1801,16 @@ adl::from(iobuf_parser& in) { .from(in); } + if ( + version + <= cluster::incremental_topic_updates::version_with_initial_retention) { + updates.initial_retention_local_target_bytes + = adl>>{}.from(in); + updates.initial_retention_local_target_ms + = adl>>{} + .from(in); + } + return updates; } @@ -1972,6 +2001,7 @@ adl::from(iobuf_parser& parser) { return {remote_revision, remote_partition_count}; } +// adl is no longer used for serializing new topic properties void adl::to( iobuf& out, cluster::topic_properties&& p) { reflection::serialize( @@ -2044,7 +2074,9 @@ adl::from(iobuf_parser& parser) { std::nullopt, std::nullopt, std::nullopt, - std::nullopt}; + std::nullopt, + tristate{std::nullopt}, + tristate{std::nullopt}}; } void adl::to( diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 81379785799d..7fb99b9780a7 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -1411,7 +1411,7 @@ struct remote_topic_properties */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -1442,7 +1442,9 @@ struct topic_properties std::optional record_value_subject_name_strategy, std::optional - record_value_subject_name_strategy_compat) + record_value_subject_name_strategy_compat, + tristate initial_retention_local_target_bytes, + tristate initial_retention_local_target_ms) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -1471,7 +1473,10 @@ struct topic_properties record_value_schema_id_validation_compat) , record_value_subject_name_strategy(record_value_subject_name_strategy) , record_value_subject_name_strategy_compat( - record_value_subject_name_strategy_compat) {} + record_value_subject_name_strategy_compat) + , initial_retention_local_target_bytes( + initial_retention_local_target_bytes) + , initial_retention_local_target_ms(initial_retention_local_target_ms) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -1510,6 +1515,10 @@ struct topic_properties std::optional record_value_subject_name_strategy_compat; + tristate initial_retention_local_target_bytes{std::nullopt}; + tristate initial_retention_local_target_ms{ + std::nullopt}; + bool is_compacted() const; bool has_overrides() const; bool requires_remote_erase() const; @@ -1543,7 +1552,9 @@ struct topic_properties record_value_schema_id_validation, record_value_schema_id_validation_compat, record_value_subject_name_strategy, - record_value_subject_name_strategy_compat); + record_value_subject_name_strategy_compat, + initial_retention_local_target_bytes, + initial_retention_local_target_ms); } friend bool operator==(const topic_properties&, const topic_properties&) @@ -1631,7 +1642,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<4>, + serde::version<5>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -1639,13 +1650,15 @@ struct incremental_topic_updates = -4; static constexpr int8_t version_with_segment_ms = -5; static constexpr int8_t version_with_schema_id_validation = -6; + static constexpr int8_t version_with_initial_retention = -7; // negative version indicating different format: // -1 - topic_updates with data_policy // -2 - topic_updates without data_policy // -3 - topic_updates with shadow_indexing // -4 - topic update with batch_max_bytes and retention.local.target // -6 - topic updates with schema id validation - static constexpr int8_t version = version_with_schema_id_validation; + // -7 - topic updates with initial retention + static constexpr int8_t version = version_with_initial_retention; property_update> compression; property_update> cleanup_policy_bitflags; @@ -1680,6 +1693,9 @@ struct incremental_topic_updates property_update< std::optional> record_value_subject_name_strategy_compat; + property_update> initial_retention_local_target_bytes; + property_update> + initial_retention_local_target_ms; auto serde_fields() { return std::tie( @@ -1703,7 +1719,9 @@ struct incremental_topic_updates record_value_schema_id_validation, record_value_schema_id_validation_compat, record_value_subject_name_strategy, - record_value_subject_name_strategy_compat); + record_value_subject_name_strategy_compat, + initial_retention_local_target_bytes, + initial_retention_local_target_ms); } friend std::ostream& diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 20f0d665b09d..25b37c3cc849 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -348,6 +348,8 @@ struct compat_check { json_write(record_value_schema_id_validation_compat); json_write(record_value_subject_name_strategy); json_write(record_value_subject_name_strategy_compat); + json_write(initial_retention_local_target_bytes); + json_write(initial_retention_local_target_ms); } static cluster::topic_properties from_json(json::Value& rd) { @@ -377,6 +379,8 @@ struct compat_check { json_read(record_value_schema_id_validation_compat); json_read(record_value_subject_name_strategy); json_read(record_value_subject_name_strategy_compat); + json_read(initial_retention_local_target_bytes); + json_read(initial_retention_local_target_ms); return obj; } @@ -399,6 +403,10 @@ struct compat_check { std::nullopt}; obj.segment_ms = tristate{std::nullopt}; + obj.initial_retention_local_target_bytes = tristate{ + std::nullopt}; + obj.initial_retention_local_target_ms + = tristate{std::nullopt}; if (reply != obj) { throw compat_error(fmt::format( @@ -469,6 +477,11 @@ struct compat_check { obj.properties.segment_ms = tristate{ std::nullopt}; + obj.properties.initial_retention_local_target_bytes = tristate{ + std::nullopt}; + obj.properties.initial_retention_local_target_ms + = tristate{std::nullopt}; + if (cfg != obj) { throw compat_error(fmt::format( "Verify of {{cluster::topic_property}} decoding " @@ -526,6 +539,11 @@ struct compat_check { topic.properties.segment_ms = tristate{ std::nullopt}; + + topic.properties.initial_retention_local_target_bytes + = tristate{std::nullopt}; + topic.properties.initial_retention_local_target_ms + = tristate{std::nullopt}; } if (req != obj) { throw compat_error(fmt::format( @@ -584,6 +602,10 @@ struct compat_check { = tristate{std::nullopt}; topic.properties.segment_ms = tristate{ std::nullopt}; + topic.properties.initial_retention_local_target_bytes + = tristate{std::nullopt}; + topic.properties.initial_retention_local_target_ms + = tristate{std::nullopt}; } if (reply != obj) { throw compat_error(fmt::format( diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 79e2bb06eb72..b4614b814999 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -622,7 +622,10 @@ struct instance_generator { std::nullopt, std::nullopt, std::nullopt, - std::nullopt}; + std::nullopt, + tests::random_tristate( + [] { return random_generators::get_int(); }), + tests::random_tristate([] { return tests::random_duration_ms(); })}; } static std::vector limits() { return {}; } diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index ff204476a246..1e35b371b3b3 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -596,6 +596,14 @@ inline void rjson_serialize( w, "record_value_subject_name_strategy_compat", tps.record_value_subject_name_strategy_compat); + write_member( + w, + "initial_retention_local_target_bytes", + tps.initial_retention_local_target_bytes); + write_member( + w, + "initial_retention_local_target_ms", + tps.initial_retention_local_target_ms); w.EndObject(); } @@ -650,6 +658,14 @@ inline void read_value(json::Value const& rd, cluster::topic_properties& obj) { rd, "record_value_subject_name_strategy_compat", obj.record_value_subject_name_strategy_compat); + read_member( + rd, + "initial_retention_local_target_bytes", + obj.initial_retention_local_target_bytes); + read_member( + rd, + "initial_retention_local_target_ms", + obj.initial_retention_local_target_ms); } inline void rjson_serialize( diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index e671a87c030b..e2ba2ea0d326 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1956,6 +1956,22 @@ configuration::configuration() .visibility = visibility::tunable}, 10, {.min = 1}) + , initial_retention_local_target_bytes_default( + *this, + "initial_retention_local_target_bytes_default", + "Initial local retention size target for partitions of topics with cloud " + "storage " + "write enabled", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + std::nullopt) + , initial_retention_local_target_ms_default( + *this, + "initial_retention_local_target_ms_default", + "Initial local retention time target for partitions of topics with cloud " + "storage " + "write enabled", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 24h) , cloud_storage_cache_size( *this, "cloud_storage_cache_size", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 70575e1c6dd0..ad831d0a3761 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -368,6 +368,10 @@ struct configuration final : public config_store { bounded_property disk_reservation_percent; bounded_property space_management_max_log_concurrency; bounded_property space_management_max_segment_concurrency; + property> + initial_retention_local_target_bytes_default; + property + initial_retention_local_target_ms_default; // Archival cache property cloud_storage_cache_size; diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 55423cf2fce0..129de2e1ade6 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -232,6 +232,21 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_initial_retention_local_target_ms) { + parse_and_set_tristate( + update.properties.initial_retention_local_target_ms, + cfg.value, + kafka::config_resource_operation::set); + continue; + } + if ( + cfg.name == topic_property_initial_retention_local_target_bytes) { + parse_and_set_tristate( + update.properties.initial_retention_local_target_bytes, + cfg.value, + kafka::config_resource_operation::set); + continue; + } if ( config::shard_local_cfg().enable_schema_id_validation() != pandaproxy::schema_registry::schema_id_validation_mode::none) { diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 9a694108c84c..75f663b663af 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -59,7 +59,9 @@ static constexpr auto supported_configs = std::to_array( topic_property_record_value_schema_id_validation, topic_property_record_value_schema_id_validation_compat, topic_property_record_value_subject_name_strategy, - topic_property_record_value_subject_name_strategy_compat}); + topic_property_record_value_subject_name_strategy_compat, + topic_property_initial_retention_local_target_bytes, + topic_property_initial_retention_local_target_ms}); bool is_supported(std::string_view name) { return std::any_of( @@ -210,6 +212,10 @@ ss::future create_topics_handler::handle( [&ctx](const creatable_topic& t) { auto result = generate_successfull_result(t); if (ctx.header().version >= api_version(5)) { + // TODO(Rob): it looks like get_default_properties is used + // only there so there is a high chance of diverging + // dry run's default_properties upposed to be used on + // topic creation and the real deal auto default_properties = ctx.metadata_cache().get_default_properties(); result.configs = {properties_to_result_configs( diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index cfae6937237a..584258ded67a 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -962,6 +962,35 @@ ss::future describe_configs_handler::handle( } } + add_topic_config_if_requested( + resource, + result, + topic_property_initial_retention_local_target_bytes, + ctx.metadata_cache() + .get_default_initial_retention_local_target_bytes(), + topic_property_initial_retention_local_target_bytes, + topic_config->properties.initial_retention_local_target_bytes, + request.data.include_synonyms, + maybe_make_documentation( + request.data.include_documentation, + config::shard_local_cfg() + .initial_retention_local_target_bytes_default.desc())); + + add_topic_config_if_requested( + resource, + result, + topic_property_initial_retention_local_target_ms, + std::make_optional( + ctx.metadata_cache() + .get_default_initial_retention_local_target_ms()), + topic_property_initial_retention_local_target_ms, + topic_config->properties.initial_retention_local_target_ms, + request.data.include_synonyms, + maybe_make_documentation( + request.data.include_documentation, + config::shard_local_cfg() + .initial_retention_local_target_ms_default.desc())); + break; } diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 94d6b9685381..db71823eaade 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -241,6 +241,21 @@ create_topic_properties_update( update.properties.segment_ms, cfg.value, op); continue; } + if ( + cfg.name == topic_property_initial_retention_local_target_bytes) { + parse_and_set_tristate( + update.properties.initial_retention_local_target_bytes, + cfg.value, + op); + continue; + } + if (cfg.name == topic_property_initial_retention_local_target_ms) { + parse_and_set_tristate( + update.properties.initial_retention_local_target_ms, + cfg.value, + op); + continue; + } if ( config::shard_local_cfg().enable_schema_id_validation() != pandaproxy::schema_registry::schema_id_validation_mode::none) { @@ -322,6 +337,15 @@ static ss::future> alter_broker_configuartion( bool errored = false; for (const auto& c : resource.configs) { + // mapping looks sane for kafka's properties + // compression.type=producer sensitive=false + // synonyms={DEFAULT_CONFIG:log_compression_type=producer} + // (configuration.cc doesn't know `compression.type` but known + // `log_compression_type`) but for redpanda's properties it returns + // redpanda.remote.read=false sensitive=false + // synonyms={DEFAULT_CONFIG:redpanda.remote.read=false} + // which looks wrong because configuration.cc doesn't know + // `redpanda.remote.read` auto mapped_name = map_config_name(c.name); // Validate int8_t is within range of config_resource_operation diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 2a5c1eb52cd5..d88cad38840f 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -187,6 +187,13 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.segment_ms = get_tristate_value( config_entries, topic_property_segment_ms); + cfg.properties.initial_retention_local_target_bytes + = get_tristate_value( + config_entries, topic_property_initial_retention_local_target_bytes); + cfg.properties.initial_retention_local_target_ms + = get_tristate_value( + config_entries, topic_property_initial_retention_local_target_ms); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; @@ -347,6 +354,14 @@ config_map_t from_cluster_type(const cluster::topic_properties& properties) { = from_config_type( properties.record_value_subject_name_strategy_compat); } + if (properties.initial_retention_local_target_bytes.has_optional_value()) { + config_entries[topic_property_initial_retention_local_target_bytes] + = from_config_type(*properties.initial_retention_local_target_bytes); + } + if (properties.initial_retention_local_target_ms.has_optional_value()) { + config_entries[topic_property_initial_retention_local_target_ms] + = from_config_type(*properties.initial_retention_local_target_ms); + } /// Final topic_property not encoded here is \ref remote_topic_properties, /// is more of an implementation detail no need to ever show user diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 75a0e6451b27..f95fca2d5844 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -94,6 +94,13 @@ static constexpr std::string_view topic_property_record_value_subject_name_strategy_compat = "confluent.value.subject.name.strategy"; +static constexpr std::string_view + topic_property_initial_retention_local_target_bytes + = "initial.retention.local.target.bytes"; +static constexpr std::string_view + topic_property_initial_retention_local_target_ms + = "initial.retention.local.target.ms"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler static constexpr std::array allowlist_topic_noop_confs = { diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index da3fc578eef9..cd90deddfd5e 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -364,7 +364,9 @@ FIXTURE_TEST( "redpanda.value.schema.id.validation", "confluent.value.schema.validation", "redpanda.value.subject.name.strategy", - "confluent.value.subject.name.strategy"}; + "confluent.value.subject.name.strategy", + "initial.retention.local.target.bytes", + "initial.retention.local.target.ms"}; // All properties_request auto all_describe_resp = describe_configs(test_tp); diff --git a/src/v/migrations/cloud_storage_config.cc b/src/v/migrations/cloud_storage_config.cc index 47347dcc8e8d..2db9cff391ba 100644 --- a/src/v/migrations/cloud_storage_config.cc +++ b/src/v/migrations/cloud_storage_config.cc @@ -50,6 +50,9 @@ ss::future<> cloud_storage_config::do_mutate() { model::shadow_indexing_mode::disabled)) || config::shard_local_cfg().cloud_storage_enable_remote_write(); + // This is a migration from retention_bytes to + // retention_local_target_bytes so we don't need to handle + // initial_retention_local_target_bytes if ( props.retention_local_target_bytes.has_optional_value() || props.retention_local_target_ms.has_optional_value()) { diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 134ba8cbb6b9..81ae390db313 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -328,6 +328,12 @@ ss::future<> service::create_internal_topic() { .value{retain_forever}}, {.name{ ss::sstring{kafka::topic_property_retention_local_target_ms}}, + .value{retain_forever}}, + {.name{ss::sstring{ + kafka::topic_property_initial_retention_local_target_bytes}}, + .value{retain_forever}}, + {.name{ss::sstring{ + kafka::topic_property_initial_retention_local_target_ms}}, .value{retain_forever}}}}; }; auto res = co_await _client.local().create_topic(make_internal_topic()); diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 558e2c3042c4..7ed0fa972dea 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -67,6 +67,10 @@ class ntp_config { // time before rolling a segment, from first write tristate segment_ms{std::nullopt}; + tristate initial_retention_local_target_bytes{std::nullopt}; + tristate initial_retention_local_target_ms{ + std::nullopt}; + friend std::ostream& operator<<(std::ostream&, const default_overrides&); }; diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index f8ff3bb8ac5c..b03c6d86abfb 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -101,7 +101,9 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { "{{compaction_strategy: {}, cleanup_policy_bitflags: {}, segment_size: " "{}, retention_bytes: {}, retention_time_ms: {}, recovery_enabled: {}, " "retention_local_target_bytes: {}, retention_local_target_ms: {}, " - "remote_delete: {}, segment_ms: {}}}", + "remote_delete: {}, segment_ms: {}, " + "initial_retention_local_target_bytes: {}, " + "initial_retention_local_target_ms: {}}}", v.compaction_strategy, v.cleanup_policy_bitflags, v.segment_size, @@ -111,7 +113,9 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { v.retention_local_target_bytes, v.retention_local_target_ms, v.remote_delete, - v.segment_ms); + v.segment_ms, + v.initial_retention_local_target_bytes, + v.initial_retention_local_target_ms); return o; } diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 163bdfa8a4a6..42fe64750568 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -79,6 +79,9 @@ class SubjectNameStrategyCompat(str, Enum): PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT = "confluent.value.schema.validation" PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT = "confluent.value.subject.name.strategy" + PROPERTY_INITIAL_RETENTION_LOCAL_TARGET_BYTES = "initial.retention.local.target.bytes" + PROPERTY_INITIAL_RETENTION_LOCAL_TARGET_MS = "initial.retention.local.target.ms" + def __init__(self, *, name=None, @@ -103,7 +106,9 @@ def __init__(self, record_value_schema_id_validation=None, record_value_schema_id_validation_compat=None, record_value_subject_name_strategy=None, - record_value_subject_name_strategy_compat=None): + record_value_subject_name_strategy_compat=None, + initial_retention_local_target_bytes=None, + initial_retention_local_target_ms=None): self.name = name or f"topic-{self._random_topic_suffix()}" self.partition_count = partition_count self.replication_factor = replication_factor @@ -127,6 +132,8 @@ def __init__(self, self.record_value_schema_id_validation_compat = record_value_schema_id_validation_compat self.record_value_subject_name_strategy = record_value_subject_name_strategy self.record_value_subject_name_strategy_compat = record_value_subject_name_strategy_compat + self.initial_retention_local_target_bytes = initial_retention_local_target_bytes + self.initial_retention_local_target_ms = initial_retention_local_target_ms def __str__(self): return self.name diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 0e94b4999d3b..c3f69a32483a 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -202,7 +202,21 @@ def test_describe_topics_with_documentation_and_types(self): "io.confluent.kafka.serializers.subject.TopicNameStrategy", doc_string= "The subject name strategy for values if confluent.value.schema.validation is enabled" - ) + ), + "initial.retention.local.target.bytes": + ConfigProperty( + config_type="LONG", + value="-1", + doc_string= + "Initial local retention size target for partitions of topics with cloud storage write enabled" + ), + "initial.retention.local.target.ms": + ConfigProperty( + config_type="LONG", + value="86400000", + doc_string= + "Initial local retention time target for partitions of topics with cloud storage write enabled" + ), } tp_spec = TopicSpec() diff --git a/tests/rptest/tests/topic_creation_test.py b/tests/rptest/tests/topic_creation_test.py index dc5f8098ea6d..d69cc21f1baf 100644 --- a/tests/rptest/tests/topic_creation_test.py +++ b/tests/rptest/tests/topic_creation_test.py @@ -425,7 +425,9 @@ def topic_alter_config_test(self): 'redpanda.remote.write': 'true', 'redpanda.remote.read': 'true', 'retention.local.target.bytes': '123456', - 'retention.local.target.ms': '123456' + 'retention.local.target.ms': '123456', + 'initial.retention.local.target.bytes': '123456', + 'initial.retention.local.target.ms': '123456' } for k, v in examples.items(): diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index f58f068520c7..a7f52bd71c5c 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -92,6 +92,13 @@ def read_topic_properties_serde(rdr: Reader, version): 'record_value_subject_name_strategy_compat': rdr.read_optional(Reader.read_serde_enum) } + if version >= 6: + topic_properties |= { + 'initial_retention_local_target_bytes': + rdr.read_tristate(Reader.read_uint64), + 'initial_retention_local_target_ms': + rdr.read_tristate(Reader.read_uint64) + } return topic_properties @@ -112,7 +119,7 @@ def read_topic_configuration_assignment_serde(rdr: Reader): rdr.read_int16(), 'properties': rdr.read_envelope(read_topic_properties_serde, - max_version=5), + max_version=6), }, 1), 'assignments': rdr.read_serde_vector(lambda r: r.read_envelope( @@ -226,10 +233,19 @@ def incr_topic_upd(rdr: Reader, version): 'record_value_subject_name_strategy_compat': rdr.read_optional(Reader.read_serde_enum) } + if version >= 5: + incr_obj |= { + 'initial_retention_local_target_bytes': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)), + 'initial_retention_local_target_ms': + read_property_update_serde( + rdr, lambda r: r.read_tristate(Reader.read_uint64)) + } return incr_obj - return rdr.read_envelope(incr_topic_upd, max_version=4) + return rdr.read_envelope(incr_topic_upd, max_version=5) def read_create_partitions_serde(rdr: Reader):