Skip to content

Commit

Permalink
Merge pull request #16641 from mmaslankaprv/backport-pr-16504-v23.2.x…
Browse files Browse the repository at this point in the history
…-367

[v23.2.x] config: update bindings when properties are reset
  • Loading branch information
piyushredpanda authored Feb 20, 2024
2 parents 61166dd + b6c6477 commit 328d83a
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ FIXTURE_TEST(test_segments_pending_deletion_limit, archiver_fixture) {
[] { config::shard_local_cfg().delete_retention_ms.reset(); });

config::shard_local_cfg()
.cloud_storage_max_segments_pending_deletion_per_partition(2);
.cloud_storage_max_segments_pending_deletion_per_partition.set_value(2);

auto [arch_conf, remote_conf] = get_configurations();
auto amv = ss::make_shared<cloud_storage::async_manifest_view>(
Expand Down
7 changes: 4 additions & 3 deletions src/v/cloud_storage/tests/remote_partition_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ scan_remote_partition_incrementally_with_reuploads(
static auto bucket = cloud_storage_clients::bucket_name("bucket");
if (maybe_max_segments) {
config::shard_local_cfg()
.cloud_storage_max_materialized_segments_per_shard(
.cloud_storage_max_materialized_segments_per_shard.set_value(
maybe_max_segments);
}
if (maybe_max_readers) {
config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard(
maybe_max_readers);
config::shard_local_cfg()
.cloud_storage_max_segment_readers_per_shard.set_value(
maybe_max_readers);
}
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
manifest_ntp, manifest_revision);
Expand Down
14 changes: 8 additions & 6 deletions src/v/cloud_storage/tests/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,13 @@ std::vector<model::record_batch_header> scan_remote_partition_incrementally(
static auto bucket = cloud_storage_clients::bucket_name("bucket");
if (maybe_max_segments) {
config::shard_local_cfg()
.cloud_storage_max_materialized_segments_per_shard(
.cloud_storage_max_materialized_segments_per_shard.set_value(
maybe_max_segments);
}
if (maybe_max_readers) {
config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard(
maybe_max_readers);
config::shard_local_cfg()
.cloud_storage_max_segment_readers_per_shard.set_value(
maybe_max_readers);
}
auto manifest = hydrate_manifest(imposter.api.local(), bucket);
partition_probe probe(manifest.get_ntp());
Expand Down Expand Up @@ -668,12 +669,13 @@ std::vector<model::record_batch_header> scan_remote_partition(
static auto bucket = cloud_storage_clients::bucket_name("bucket");
if (maybe_max_segments) {
config::shard_local_cfg()
.cloud_storage_max_materialized_segments_per_shard(
.cloud_storage_max_materialized_segments_per_shard.set_value(
maybe_max_segments);
}
if (maybe_max_readers) {
config::shard_local_cfg().cloud_storage_max_segment_readers_per_shard(
maybe_max_readers);
config::shard_local_cfg()
.cloud_storage_max_segment_readers_per_shard.set_value(
maybe_max_readers);
}
storage::log_reader_config reader_config(
base, max, ss::default_priority_class());
Expand Down
19 changes: 17 additions & 2 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,21 @@ apply_local(cluster_config_delta_cmd_data const& data, bool silent) {

auto& property = cfg.get(r);
result.restart |= property.needs_restart();
property.reset();
try {
property.reset();
} catch (...) {
// Most probably one of the watch callbacks is buggy and failed to
// handle the update. Don't stop the controller STM, but log with
// error severity so that at least we can catch these bugs in tests.
if (!silent) {
vlog(
clusterlog.error,
"Unexpected error resetting property {}: {}",
r,
std::current_exception());
}
continue;
}
}

return result;
Expand Down Expand Up @@ -882,7 +896,8 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) {
const cluster_config_delta_cmd_data& data = cmd.value;
vlog(
clusterlog.trace,
"apply_delta: {} upserts, {} removes",
"apply_delta version {}: {} upserts, {} removes",
delta_version,
data.upsert.size(),
data.remove.size());

Expand Down
3 changes: 3 additions & 0 deletions src/v/config/bounded_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ class bounded_property : public property<T> {
, _bounds(bounds)
, _example(generate_example()) {}

using property<T>::set_value;

void set_value(std::any v) override {
property<T>::update_value(std::any_cast<T>(std::move(v)));
}

bool set_value(YAML::Node n) override {
auto val = std::move(n.as<T>());
return clamp_and_update(val);
Expand Down
30 changes: 22 additions & 8 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ class property : public base_property {
update_value(std::any_cast<T>(std::move(v)));
}

template<typename U>
requires std::constructible_from<T, U>
void set_value(U&& v) {
// needs to go through virtual inheritance chain, since this class is
// not final
set_value(std::make_any<T>(std::forward<U>(v)));
}

bool set_value(YAML::Node n) override {
return update_value(std::move(n.as<T>()));
}
Expand All @@ -177,15 +185,14 @@ class property : public base_property {
return validate(v);
}

void reset() override { _value = default_value(); }

property<T>& operator()(T v) {
_value = std::move(v);
return *this;
void reset() override {
auto v = default_value();
update_value(std::move(v));
}

base_property& operator=(const base_property& pr) override {
_value = dynamic_cast<const property<T>&>(pr)._value;
auto v = dynamic_cast<const property<T>&>(pr)._value;
update_value(std::move(v));
return *this;
}

Expand Down Expand Up @@ -262,8 +269,10 @@ class property : public base_property {

bool update_value(T&& new_value) {
if (new_value != _value) {
notify_watchers(new_value);
// Update the main value first, in case one of the binding updates
// throws.
_value = std::move(new_value);
notify_watchers(_value);

return true;
} else {
Expand Down Expand Up @@ -864,9 +873,14 @@ class retention_duration_property final
: public property<std::optional<std::chrono::milliseconds>> {
public:
using property::property;
using property::set_value;

void set_value(std::any v) final {
update_value(std::any_cast<std::chrono::milliseconds>(std::move(v)));
update_value(
std::any_cast<std::optional<std::chrono::milliseconds>>(std::move(v))
.value_or(-1ms));
}

bool set_value(YAML::Node n) final {
return update_value(n.as<std::chrono::milliseconds>());
}
Expand Down
28 changes: 27 additions & 1 deletion src/v/config/tests/bounded_property_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@

#include <seastar/testing/thread_test_case.hh>

#include <chrono>
#include <optional>

static_assert(config::detail::bounds<config::numeric_integral_bounds<int>>);
static_assert(config::detail::bounds<config::numeric_bounds<double>>);

namespace {

using namespace std::literals;

struct test_config : public config::config_store {
config::bounded_property<int32_t> bounded_int;
config::bounded_property<std::optional<int32_t>> bounded_int_opt;
config::bounded_property<int16_t> odd_constraint;
config::bounded_property<double, config::numeric_bounds> bounded_double;
config::bounded_property<std::optional<double>, config::numeric_bounds>
bounded_double_opt;
config::bounded_property<std::optional<std::chrono::milliseconds>>
bounded_opt_ms;

test_config()
: bounded_int(
Expand Down Expand Up @@ -62,7 +67,14 @@ struct test_config : public config::config_store {
"An options float with some bounds set",
{},
std::nullopt,
{.min = -1, .max = 2.236067977}) {}
{.min = -1, .max = 2.236067977})
, bounded_opt_ms(
*this,
"bounded_opt_ms",
"An optional duration",
{},
std::nullopt,
{.min = 5ms}) {}
};

SEASTAR_THREAD_TEST_CASE(numeric_integral_bounds) {
Expand Down Expand Up @@ -165,4 +177,18 @@ SEASTAR_THREAD_TEST_CASE(numeric_fp_bounds) {
BOOST_TEST(cfg.bounded_double() == 2.236067977);
}

SEASTAR_THREAD_TEST_CASE(bounded_property_set_value) {
auto cfg = test_config();

BOOST_TEST_INFO("fully specified optional");
cfg.bounded_opt_ms.set_value(std::optional{20ms});
BOOST_CHECK_EQUAL(cfg.bounded_opt_ms(), 20ms);
BOOST_TEST_INFO("std::nullopt");
cfg.bounded_opt_ms.set_value(std::nullopt);
BOOST_CHECK(!cfg.bounded_opt_ms());
BOOST_TEST_INFO("a value convertible to the optional::value type");
cfg.bounded_opt_ms.set_value(2h);
BOOST_CHECK_EQUAL(cfg.bounded_opt_ms(), 2h);
}

} // namespace
7 changes: 7 additions & 0 deletions src/v/config/tests/config_store_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ SEASTAR_THREAD_TEST_CASE(property_bind) {
BOOST_TEST(bind2() == "newvalue4");
BOOST_TEST(bind3() == "newvalue4");
BOOST_TEST(watch_count == 6);

// Check that the bindings are updated when the property is reset to its
// default value.
cfg2.required_string.reset();
BOOST_TEST(bind2() == "");
BOOST_TEST(bind3() == "");
BOOST_TEST(watch_count == 8);
}

SEASTAR_THREAD_TEST_CASE(property_conversion_bind) {
Expand Down
24 changes: 19 additions & 5 deletions src/v/net/conn_quota.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,28 @@ conn_quota::conn_quota(conn_quota::config_fn cfg_f) noexcept
});

_cfg.max_connections_per_ip.watch([this]() {
if (
!_cfg.max_connections_per_ip()
&& _cfg.max_connections_overrides().empty()) {
if (!_cfg.max_connections_per_ip()) {
if (ss::this_shard_id() == 0) {
vlog(
rpc::rpclog.info, "Connection count per-IP limit disabled");
rpc::rpclog.info,
"Default connection count per-IP limit disabled");
}
ip_home.clear();

if (_cfg.max_connections_overrides().empty()) {
ip_home.clear();
} else {
// Remove everything except the overrides.
std::vector<inet_address_key> to_delete;
for (const auto& i : ip_home) {
if (!overrides.contains(i.first)) {
to_delete.push_back(i.first);
}
}
for (const auto& k : to_delete) {
ip_home.erase(k);
}
}

ip_remote.clear();
} else {
auto new_limit = _cfg.max_connections_per_ip().value();
Expand Down

0 comments on commit 328d83a

Please sign in to comment.