diff --git a/src/v/archival/tests/ntp_archiver_test.cc b/src/v/archival/tests/ntp_archiver_test.cc index 0cb5c1f71890..c97b20887a30 100644 --- a/src/v/archival/tests/ntp_archiver_test.cc +++ b/src/v/archival/tests/ntp_archiver_test.cc @@ -749,7 +749,7 @@ FIXTURE_TEST(test_segments_pending_deletion_limit, archiver_fixture) { [] { config::shard_local_cfg().delete_retention_ms.reset(); }); config::shard_local_cfg() - .cloud_storage_max_segments_pending_deletion_per_partition(2); + .cloud_storage_max_segments_pending_deletion_per_partition.set_value(2); auto [arch_conf, remote_conf] = get_configurations(); auto amv = ss::make_shared( diff --git a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc index d325bbb82a15..f0f0425db6da 100644 --- a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc @@ -37,12 +37,13 @@ scan_remote_partition_incrementally_with_reuploads( static auto bucket = cloud_storage_clients::bucket_name("bucket"); if (maybe_max_segments) { config::shard_local_cfg() - .cloud_storage_max_materialized_segments_per_shard( + .cloud_storage_max_materialized_segments_per_shard.set_value( maybe_max_segments); } if (maybe_max_readers) { - config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard( - maybe_max_readers); + config::shard_local_cfg() + .cloud_storage_max_segment_readers_per_shard.set_value( + maybe_max_readers); } auto m = ss::make_lw_shared( manifest_ntp, manifest_revision); diff --git a/src/v/cloud_storage/tests/util.cc b/src/v/cloud_storage/tests/util.cc index 3cebcfa08548..94843b8c8940 100644 --- a/src/v/cloud_storage/tests/util.cc +++ b/src/v/cloud_storage/tests/util.cc @@ -588,12 +588,13 @@ std::vector scan_remote_partition_incrementally( static auto bucket = cloud_storage_clients::bucket_name("bucket"); if (maybe_max_segments) { config::shard_local_cfg() - .cloud_storage_max_materialized_segments_per_shard( + .cloud_storage_max_materialized_segments_per_shard.set_value( maybe_max_segments); } if (maybe_max_readers) { - config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard( - maybe_max_readers); + config::shard_local_cfg() + .cloud_storage_max_segment_readers_per_shard.set_value( + maybe_max_readers); } auto manifest = hydrate_manifest(imposter.api.local(), bucket); partition_probe probe(manifest.get_ntp()); @@ -668,12 +669,13 @@ std::vector scan_remote_partition( static auto bucket = cloud_storage_clients::bucket_name("bucket"); if (maybe_max_segments) { config::shard_local_cfg() - .cloud_storage_max_materialized_segments_per_shard( + .cloud_storage_max_materialized_segments_per_shard.set_value( maybe_max_segments); } if (maybe_max_readers) { - config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard( - maybe_max_readers); + config::shard_local_cfg() + .cloud_storage_max_segment_readers_per_shard.set_value( + maybe_max_readers); } storage::log_reader_config reader_config( base, max, ss::default_priority_class()); diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index 7b3810515283..23982ba21bb5 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -729,7 +729,21 @@ apply_local(cluster_config_delta_cmd_data const& data, bool silent) { auto& property = cfg.get(r); result.restart |= property.needs_restart(); - property.reset(); + try { + property.reset(); + } catch (...) { + // Most probably one of the watch callbacks is buggy and failed to + // handle the update. Don't stop the controller STM, but log with + // error severity so that at least we can catch these bugs in tests. + if (!silent) { + vlog( + clusterlog.error, + "Unexpected error resetting property {}: {}", + r, + std::current_exception()); + } + continue; + } } return result; @@ -882,7 +896,8 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) { const cluster_config_delta_cmd_data& data = cmd.value; vlog( clusterlog.trace, - "apply_delta: {} upserts, {} removes", + "apply_delta version {}: {} upserts, {} removes", + delta_version, data.upsert.size(), data.remove.size()); diff --git a/src/v/config/bounded_property.h b/src/v/config/bounded_property.h index 83a0f2eb2ed5..5bb1d2b13196 100644 --- a/src/v/config/bounded_property.h +++ b/src/v/config/bounded_property.h @@ -215,9 +215,12 @@ class bounded_property : public property { , _bounds(bounds) , _example(generate_example()) {} + using property::set_value; + void set_value(std::any v) override { property::update_value(std::any_cast(std::move(v))); } + bool set_value(YAML::Node n) override { auto val = std::move(n.as()); return clamp_and_update(val); diff --git a/src/v/config/property.h b/src/v/config/property.h index f6e5e3b48ceb..3ef63b37cca2 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -161,6 +161,14 @@ class property : public base_property { update_value(std::any_cast(std::move(v))); } + template + requires std::constructible_from + void set_value(U&& v) { + // needs to go through virtual inheritance chain, since this class is + // not final + set_value(std::make_any(std::forward(v))); + } + bool set_value(YAML::Node n) override { return update_value(std::move(n.as())); } @@ -177,15 +185,14 @@ class property : public base_property { return validate(v); } - void reset() override { _value = default_value(); } - - property& operator()(T v) { - _value = std::move(v); - return *this; + void reset() override { + auto v = default_value(); + update_value(std::move(v)); } base_property& operator=(const base_property& pr) override { - _value = dynamic_cast&>(pr)._value; + auto v = dynamic_cast&>(pr)._value; + update_value(std::move(v)); return *this; } @@ -262,8 +269,10 @@ class property : public base_property { bool update_value(T&& new_value) { if (new_value != _value) { - notify_watchers(new_value); + // Update the main value first, in case one of the binding updates + // throws. _value = std::move(new_value); + notify_watchers(_value); return true; } else { @@ -864,9 +873,14 @@ class retention_duration_property final : public property> { public: using property::property; + using property::set_value; + void set_value(std::any v) final { - update_value(std::any_cast(std::move(v))); + update_value( + std::any_cast>(std::move(v)) + .value_or(-1ms)); } + bool set_value(YAML::Node n) final { return update_value(n.as()); } diff --git a/src/v/config/tests/bounded_property_test.cc b/src/v/config/tests/bounded_property_test.cc index 85100043de2a..89b289703109 100644 --- a/src/v/config/tests/bounded_property_test.cc +++ b/src/v/config/tests/bounded_property_test.cc @@ -12,6 +12,7 @@ #include +#include #include static_assert(config::detail::bounds>); @@ -19,6 +20,8 @@ static_assert(config::detail::bounds>); namespace { +using namespace std::literals; + struct test_config : public config::config_store { config::bounded_property bounded_int; config::bounded_property> bounded_int_opt; @@ -26,6 +29,8 @@ struct test_config : public config::config_store { config::bounded_property bounded_double; config::bounded_property, config::numeric_bounds> bounded_double_opt; + config::bounded_property> + bounded_opt_ms; test_config() : bounded_int( @@ -62,7 +67,14 @@ struct test_config : public config::config_store { "An options float with some bounds set", {}, std::nullopt, - {.min = -1, .max = 2.236067977}) {} + {.min = -1, .max = 2.236067977}) + , bounded_opt_ms( + *this, + "bounded_opt_ms", + "An optional duration", + {}, + std::nullopt, + {.min = 5ms}) {} }; SEASTAR_THREAD_TEST_CASE(numeric_integral_bounds) { @@ -165,4 +177,18 @@ SEASTAR_THREAD_TEST_CASE(numeric_fp_bounds) { BOOST_TEST(cfg.bounded_double() == 2.236067977); } +SEASTAR_THREAD_TEST_CASE(bounded_property_set_value) { + auto cfg = test_config(); + + BOOST_TEST_INFO("fully specified optional"); + cfg.bounded_opt_ms.set_value(std::optional{20ms}); + BOOST_CHECK_EQUAL(cfg.bounded_opt_ms(), 20ms); + BOOST_TEST_INFO("std::nullopt"); + cfg.bounded_opt_ms.set_value(std::nullopt); + BOOST_CHECK(!cfg.bounded_opt_ms()); + BOOST_TEST_INFO("a value convertible to the optional::value type"); + cfg.bounded_opt_ms.set_value(2h); + BOOST_CHECK_EQUAL(cfg.bounded_opt_ms(), 2h); +} + } // namespace diff --git a/src/v/config/tests/config_store_test.cc b/src/v/config/tests/config_store_test.cc index d8d3a858109a..23d1837f56b7 100644 --- a/src/v/config/tests/config_store_test.cc +++ b/src/v/config/tests/config_store_test.cc @@ -446,6 +446,13 @@ SEASTAR_THREAD_TEST_CASE(property_bind) { BOOST_TEST(bind2() == "newvalue4"); BOOST_TEST(bind3() == "newvalue4"); BOOST_TEST(watch_count == 6); + + // Check that the bindings are updated when the property is reset to its + // default value. + cfg2.required_string.reset(); + BOOST_TEST(bind2() == ""); + BOOST_TEST(bind3() == ""); + BOOST_TEST(watch_count == 8); } SEASTAR_THREAD_TEST_CASE(property_conversion_bind) { diff --git a/src/v/net/conn_quota.cc b/src/v/net/conn_quota.cc index 25e99a935f27..9926922f4e84 100644 --- a/src/v/net/conn_quota.cc +++ b/src/v/net/conn_quota.cc @@ -60,14 +60,28 @@ conn_quota::conn_quota(conn_quota::config_fn cfg_f) noexcept }); _cfg.max_connections_per_ip.watch([this]() { - if ( - !_cfg.max_connections_per_ip() - && _cfg.max_connections_overrides().empty()) { + if (!_cfg.max_connections_per_ip()) { if (ss::this_shard_id() == 0) { vlog( - rpc::rpclog.info, "Connection count per-IP limit disabled"); + rpc::rpclog.info, + "Default connection count per-IP limit disabled"); } - ip_home.clear(); + + if (_cfg.max_connections_overrides().empty()) { + ip_home.clear(); + } else { + // Remove everything except the overrides. + std::vector to_delete; + for (const auto& i : ip_home) { + if (!overrides.contains(i.first)) { + to_delete.push_back(i.first); + } + } + for (const auto& k : to_delete) { + ip_home.erase(k); + } + } + ip_remote.clear(); } else { auto new_limit = _cfg.max_connections_per_ip().value();