From 16a6cd0bd56bb16b3aa31a8b1577d97c10655a07 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 20:46:57 -0400 Subject: [PATCH 1/9] f/quotas: do not record intake traffic for noncontrolled api keys Followup to #10285. There the intake traffic point was left unconditional of the api key, which caused anomalities in tput related metrics. --- src/v/kafka/server/connection_context.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 46ed082060a1..76e779716eb7 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -334,7 +334,9 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { // protect against shutdown behavior return ss::make_ready_future<>(); } - _server.snc_quota_mgr().record_request_intake(size); + if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { + _server.snc_quota_mgr().record_request_intake(size); + } auto sres = ss::make_lw_shared(std::move(sres_in)); From a1cc8f7b9d6d68e5a3efe1dc09b615cd61a82560 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 8 May 2023 22:19:29 -0400 Subject: [PATCH 2/9] config: throughput_control_group +UT throughput_control_group is a structural element for tput control groups configuration. This commit adds its implementation sufficient to cover tput exemptions by client_id matched with a regex. --- src/v/config/CMakeLists.txt | 1 + src/v/config/tests/CMakeLists.txt | 3 +- .../tests/throughput_control_group_test.cc | 131 ++++++++++++ src/v/config/throughput_control_group.cc | 191 ++++++++++++++++++ src/v/config/throughput_control_group.h | 122 +++++++++++ 5 files changed, 447 insertions(+), 1 deletion(-) create mode 100644 src/v/config/tests/throughput_control_group_test.cc create mode 100644 src/v/config/throughput_control_group.cc create mode 100644 src/v/config/throughput_control_group.h diff --git a/src/v/config/CMakeLists.txt b/src/v/config/CMakeLists.txt index 06c5eedde66e..f4864caff44b 100644 --- a/src/v/config/CMakeLists.txt +++ b/src/v/config/CMakeLists.txt @@ -9,6 +9,7 @@ v_cc_library( base_property.cc rjson_serialization.cc validators.cc + throughput_control_group.cc DEPS v::json v::model diff --git a/src/v/config/tests/CMakeLists.txt b/src/v/config/tests/CMakeLists.txt index 75e7572935c7..e40a5c77efca 100644 --- a/src/v/config/tests/CMakeLists.txt +++ b/src/v/config/tests/CMakeLists.txt @@ -8,7 +8,8 @@ set(srcs advertised_kafka_api_test.cc seed_server_property_test.cc cloud_credentials_source_test.cc - validator_tests.cc) + validator_tests.cc + throughput_control_group_test.cc) rp_test( UNIT_TEST diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc new file mode 100644 index 000000000000..e40b7350ad34 --- /dev/null +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -0,0 +1,131 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "config/config_store.h" +#include "config/property.h" +#include "config/throughput_control_group.h" + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace std::string_literals; + +SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { + std::vector tcgv; + tcgv.emplace_back(); + + struct test_config : public config::config_store { + config::property> cgroups; + test_config() + : cgroups(*this, "cgroups", "") {} + }; + + auto cfg_node = YAML::Load(R"( +cgroups: + - name: "" + client_id: client_id-1 + - client_id: cli.+_id-\d+ + - client_id: another unnamed group indended to verify this passes validation + - name: match-nothing-group + client_id: "" + - name: cgroup-catchall because no clientid group given +)"); + cfg_node.SetStyle(YAML::EmitterStyle::Flow); + + test_config cfg; + BOOST_TEST(cfg.read_yaml(cfg_node).empty()); + BOOST_TEST( + YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false})) + == YAML::Dump(cfg_node)); + BOOST_TEST(cfg.cgroups().size() == 5); + for (auto& cg : cfg.cgroups()) { + BOOST_TEST(!cg.throughput_limit_node_in_bps); + BOOST_TEST(!cg.throughput_limit_node_out_bps); + BOOST_TEST(cg.validate().empty()); + } + BOOST_TEST(cfg.cgroups()[0].name == ""); + BOOST_TEST(!cfg.cgroups()[0].is_noname()); + BOOST_TEST(cfg.cgroups()[1].name != ""); + BOOST_TEST(cfg.cgroups()[1].is_noname()); + BOOST_TEST(cfg.cgroups()[3].name == "match-nothing-group"); + BOOST_TEST( + (config::find_throughput_control_group( + cfg.cgroups().cbegin(), cfg.cgroups().cend(), "client_id-1") + == cfg.cgroups().cbegin())); + BOOST_TEST( + (config::find_throughput_control_group( + cfg.cgroups().cbegin(), cfg.cgroups().cend(), "clinet_id-2") + == cfg.cgroups().cbegin() + 1)); + BOOST_TEST( + (config::find_throughput_control_group( + cfg.cgroups().cbegin(), cfg.cgroups().cend(), "nonclient_id") + == cfg.cgroups().cbegin() + 4)); + BOOST_TEST(!validate_throughput_control_groups( + cfg.cgroups().cbegin(), cfg.cgroups().cend())); + + // Failure cases + + // Control characters in names. In YAML, control characters [are not + // allowed](https://yaml.org/spec/1.2.2/#51-character-set) in the stream and + // should be [escaped](https://yaml.org/spec/1.2.2/#57-escaped-characters). + // Yaml-cpp does not recognize escape sequences. It fails with + // YAML::ParseException on some control characters (like \0), but lets + // others through (like \b). throughput_control_group parser should not + // let any through though. + BOOST_CHECK_THROW( + cfg.read_yaml(YAML::Load("cgroups: [{name: n\0, client_id: c1}]"s)), + YAML::Exception); + BOOST_TEST( + !cfg.read_yaml(YAML::Load("cgroups: [{name: n1, client_id: c-1-\b-2}]"s)) + .empty()); + + // Invalid regex syntax + BOOST_TEST( + !cfg.read_yaml(YAML::Load(R"(cgroups: [{name: n, client_id: "[A-Z}"}])"s)) + .empty()); + BOOST_TEST( + !cfg.read_yaml(YAML::Load(R"(cgroups: [{name: n, client_id: "*"}])"s)) + .empty()); + + // Specify any throupghput limit + BOOST_TEST( + !cfg + .read_yaml(YAML::Load( + R"(cgroups: [{name: n, client_id: c, throughput_limit_node_in_bps: 0}])"s)) + .empty()); + BOOST_TEST( + !cfg + .read_yaml(YAML::Load( + R"(cgroups: [{name: n, client_id: c, throughput_limit_node_out_bps: 100}])"s)) + .empty()); + + // Duplicate group names other than unnamed + BOOST_TEST(cfg + .read_yaml(YAML::Load( + R"(cgroups: [{name: developers}, {name: developers}])"s)) + .empty()); + BOOST_TEST( + validate_throughput_control_groups( + cfg.cgroups().cbegin(), cfg.cgroups().cend()) + .value_or("") + .find("uplicate") + != ss::sstring::npos); +} diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc new file mode 100644 index 000000000000..24da000a5179 --- /dev/null +++ b/src/v/config/throughput_control_group.cc @@ -0,0 +1,191 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "throughput_control_group.h" + +#include "config/convert.h" +#include "ssx/sformat.h" +#include "utils/to_string.h" +#include "utils/utf8.h" + +#include + +#include +#include +#include + +#include +#include +#include +#include + +using namespace std::string_literals; + +namespace { + +template +requires std::same_as || std::same_as +bool contains_control_characters(const T& s) { + const auto& ctype = std::use_facet>( + std::locale::classic()); + const char* const end = s.c_str() + s.length(); + return ctype.scan_is(std::ctype_base::cntrl, s.c_str(), end) != end; +} + +} // namespace + +namespace config { + +namespace ids { +constexpr const char* name = "name"; +constexpr const char* client_id = "client_id"; +constexpr const char* tp_limit_node_in = "throughput_limit_node_in_bps"; +constexpr const char* tp_limit_node_out = "throughput_limit_node_out_bps"; +} // namespace ids + +static const ss::sstring noname(1, '\0'); + +throughput_control_group::throughput_control_group() = default; +throughput_control_group::throughput_control_group( + throughput_control_group&&) noexcept + = default; +throughput_control_group& +throughput_control_group::operator=(throughput_control_group&&) noexcept + = default; +throughput_control_group::~throughput_control_group() noexcept = default; + +throughput_control_group::throughput_control_group( + const throughput_control_group& other) + : name(other.name) + , client_id_re( + other.client_id_re + ? std::make_unique(other.client_id_re->pattern()) + : std::unique_ptr{}) + , throughput_limit_node_in_bps(other.throughput_limit_node_in_bps) + , throughput_limit_node_out_bps(other.throughput_limit_node_out_bps) {} + +throughput_control_group& +throughput_control_group::operator=(const throughput_control_group& other) { + return *this = throughput_control_group(other); +} + +std::ostream& +operator<<(std::ostream& os, const throughput_control_group& tcg) { + fmt::print( + os, + "{{group_name: {}, client_id: {}, throughput_limit_node_in_bps: {}, " + "throughput_limit_node_out_bps: {}}}", + tcg.name, + tcg.client_id_re ? tcg.client_id_re->pattern() : ""s, + tcg.throughput_limit_node_in_bps, + tcg.throughput_limit_node_out_bps); + return os; +} + +bool throughput_control_group::match_client_id( + const ss::sstring& client_id) const { + if (!client_id_re) { + // omitted match criterion means "always match" + return true; + } + return re2::RE2::FullMatch( + re2::StringPiece(client_id.c_str(), client_id.length()), *client_id_re); +} + +bool throughput_control_group::is_noname() const noexcept { + return name == noname; +} + +ss::sstring throughput_control_group::validate() const { + if (unlikely(name != noname && contains_control_character(name))) { + return "Group name contains invalid character"; + } + if (unlikely(client_id_re && !client_id_re->ok())) { + return ss::format("Invalid client_id regex. {}", client_id_re->error()); + } + return {}; +} + +} // namespace config + +namespace YAML { + +Node convert::encode(const type& tcg) { + Node node; + if (tcg.name != config::noname) { + node[config::ids::name] = tcg.name; + } + if (tcg.client_id_re) { + node[config::ids::client_id] = tcg.client_id_re->pattern(); + } + return node; +} + +bool convert::decode( + const Node& node, type& tcg) { + config::throughput_control_group res; + if (const auto& n = node[config::ids::name]; n) { + res.name = n.as(); + if (contains_control_characters(res.name)) { + return false; + } + } else { + res.name = config::noname; + } + if (const auto& n = node[config::ids::client_id]; n) { + const auto s = n.as(); + if (contains_control_characters(s)) { + return false; + } + res.client_id_re = std::make_unique(s); + if (!res.client_id_re->ok()) { + return false; + } + } else { + res.client_id_re.reset(); + } + if (const auto& n = node[config::ids::tp_limit_node_in]; n) { + // only the no-limit option is supported yet + return false; + } else { + res.throughput_limit_node_in_bps = std::nullopt; + } + if (const auto& n = node[config::ids::tp_limit_node_out]; n) { + // only the no-limit option is supported yet + return false; + } else { + res.throughput_limit_node_out_bps = std::nullopt; + } + + tcg = std::move(res); + return true; +} + +} // namespace YAML + +namespace json { + +void rjson_serialize( + json::Writer& w, + const config::throughput_control_group& tcg) { + w.StartObject(); + if (tcg.name != config::noname) { + w.Key(config::ids::name); + w.String(tcg.name); + } + if (tcg.client_id_re) { + w.Key(config::ids::client_id); + w.String(tcg.client_id_re->pattern()); + } + w.EndObject(); +} + +} // namespace json diff --git a/src/v/config/throughput_control_group.h b/src/v/config/throughput_control_group.h new file mode 100644 index 000000000000..2d93eac0f164 --- /dev/null +++ b/src/v/config/throughput_control_group.h @@ -0,0 +1,122 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "config/property.h" +#include "json/_include_first.h" +#include "json/stringbuffer.h" +#include "json/writer.h" +#include "seastarx.h" + +#include +#include + +#include + +#include +#include +#include + +namespace re2 { +class RE2; +} + +namespace config { + +struct throughput_control_group { + throughput_control_group(); + throughput_control_group(const throughput_control_group&); + throughput_control_group& operator=(const throughput_control_group&); + throughput_control_group(throughput_control_group&&) noexcept; + throughput_control_group& operator=(throughput_control_group&&) noexcept; + ~throughput_control_group() noexcept; + + friend bool + operator==(const throughput_control_group&, const throughput_control_group&) + = default; + + friend std::ostream& + operator<<(std::ostream& os, const throughput_control_group& tcg); + + bool match_client_id(const ss::sstring& client_id) const; + bool is_noname() const noexcept; + ss::sstring validate() const; + + ss::sstring name; + std::unique_ptr client_id_re; + // nuillopt means unlimited: + std::optional throughput_limit_node_in_bps; + std::optional throughput_limit_node_out_bps; +}; + +template<> +consteval std::string_view +detail::property_type_name() { + return "config::throughput_control_group"; +} + +template +InputIt find_throughput_control_group( + const InputIt first, const InputIt last, const ss::sstring& client_id) { + return std::find_if( + first, last, [&client_id](const config::throughput_control_group& cg) { + return cg.match_client_id(client_id); + }); +} + +template +std::optional +validate_throughput_control_groups(const InputIt first, const InputIt last) { + // verify that group names are unique where set + // o(n^2/2) algo because we don't expect many items + for (auto i = first; i != last; ++i) { + if (const auto verr = i->validate(); unlikely(!verr.empty())) { + return ss::format( + "Validation failed for throughput control group #{}: {}. cgroup: " + "{{{}}}", + std::distance(first, i), + verr, + *i); + } + if (i->is_noname()) { + continue; + } + if (unlikely( + std::any_of(first, i, [i](const throughput_control_group& cg) { + return cg.name == i->name; + }))) { + return ss::format( + "Duplicate throughput control group name: {}", i->name); + } + } + return std::nullopt; +} + +} // namespace config + +namespace YAML { +template<> +struct convert { + using type = config::throughput_control_group; + static Node encode(const type& rhs); + static bool decode(const Node& node, type& rhs); +}; + +} // namespace YAML + +namespace json { + +void rjson_serialize( + json::Writer& w, + const config::throughput_control_group& ep); + +} // namespace json From bd43f4727a9887bad52d8b56c77934e755c69a67 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 01:56:41 -0400 Subject: [PATCH 3/9] cloud_storage_clients: move always_false_v -> utils always_false_v moved to utils/functional.h so that it can be reused --- src/v/cloud_storage/remote.cc | 7 ++----- src/v/cloud_storage_clients/configuration.h | 3 --- src/v/utils/functional.h | 3 +++ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/v/cloud_storage/remote.cc b/src/v/cloud_storage/remote.cc index 4083a03cfda1..6fae0e390451 100644 --- a/src/v/cloud_storage/remote.cc +++ b/src/v/cloud_storage/remote.cc @@ -1242,8 +1242,7 @@ void auth_refresh_bg_op::do_start_auth_refresh_op( return cloud_roles::aws_region_name{}; } else { static_assert( - cloud_storage_clients::always_false_v, - "Unknown client type"); + always_false_v, "Unknown client type"); return cloud_roles::aws_region_name{}; } }, @@ -1293,9 +1292,7 @@ cloud_roles::credentials auth_refresh_bg_op::build_static_credentials() const { return cloud_roles::abs_credentials{ cfg.storage_account_name, cfg.shared_key.value()}; } else { - static_assert( - cloud_storage_clients::always_false_v, - "Unknown client type"); + static_assert(always_false_v, "Unknown client type"); return cloud_roles::aws_credentials{}; } }, diff --git a/src/v/cloud_storage_clients/configuration.h b/src/v/cloud_storage_clients/configuration.h index c4b8070d773d..c487dc88e2a5 100644 --- a/src/v/cloud_storage_clients/configuration.h +++ b/src/v/cloud_storage_clients/configuration.h @@ -97,9 +97,6 @@ using client_configuration_variant = std::variant; using client_configuration = client_configuration_variant; -template -inline constexpr bool always_false_v = false; - model::cloud_storage_backend infer_backend_from_configuration( const client_configuration& client_config, model::cloud_credentials_source cloud_storage_credentials_source); diff --git a/src/v/utils/functional.h b/src/v/utils/functional.h index a7bacf3783e0..061d1ed32571 100644 --- a/src/v/utils/functional.h +++ b/src/v/utils/functional.h @@ -14,6 +14,9 @@ #include #include +template +inline constexpr bool always_false_v = false; + template concept SupportsPushBack = requires(T a, U b) { { a.push_back(b) } -> std::same_as; From b1fa02e1b41bc76ba57409c6106b295a67fcbdcc Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 02:08:17 -0400 Subject: [PATCH 4/9] config: support for empty client_id in throughput_control_group Client can omit client_id specification on its side. This is different from empty client_id value, and for throughput_control_group it is also different from omitting client_id (which means match anything regardless of client_id). This commit introduces special selector tags for client_id value in throughput_control_group. The syntax for selector tags is "+name", this makes them distinct from any regex because regex cannot start with a "+". One selector name is introduced: "empty", it matches the omitted client_id values and only them. To support that, throughput_control_group does not store a regex directly anymore. Now it's a variant type with one option for "empty" and another for the regex. --- .../tests/throughput_control_group_test.cc | 34 ++-- src/v/config/throughput_control_group.cc | 162 ++++++++++++++---- src/v/config/throughput_control_group.h | 9 +- 3 files changed, 158 insertions(+), 47 deletions(-) diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc index e40b7350ad34..729ef71eb771 100644 --- a/src/v/config/tests/throughput_control_group_test.cc +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -46,6 +47,8 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { - client_id: another unnamed group indended to verify this passes validation - name: match-nothing-group client_id: "" + - name: unspecified client_id + client_id: +empty - name: cgroup-catchall because no clientid group given )"); cfg_node.SetStyle(YAML::EmitterStyle::Flow); @@ -55,7 +58,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { BOOST_TEST( YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false})) == YAML::Dump(cfg_node)); - BOOST_TEST(cfg.cgroups().size() == 5); + BOOST_TEST(cfg.cgroups().size() == 6); for (auto& cg : cfg.cgroups()) { BOOST_TEST(!cg.throughput_limit_node_in_bps); BOOST_TEST(!cg.throughput_limit_node_out_bps); @@ -66,21 +69,26 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { BOOST_TEST(cfg.cgroups()[1].name != ""); BOOST_TEST(cfg.cgroups()[1].is_noname()); BOOST_TEST(cfg.cgroups()[3].name == "match-nothing-group"); - BOOST_TEST( - (config::find_throughput_control_group( - cfg.cgroups().cbegin(), cfg.cgroups().cend(), "client_id-1") - == cfg.cgroups().cbegin())); - BOOST_TEST( - (config::find_throughput_control_group( - cfg.cgroups().cbegin(), cfg.cgroups().cend(), "clinet_id-2") - == cfg.cgroups().cbegin() + 1)); - BOOST_TEST( - (config::find_throughput_control_group( - cfg.cgroups().cbegin(), cfg.cgroups().cend(), "nonclient_id") - == cfg.cgroups().cbegin() + 4)); BOOST_TEST(!validate_throughput_control_groups( cfg.cgroups().cbegin(), cfg.cgroups().cend())); + // Matches + const auto get_match_index = + [&cfg]( + std::optional client_id) -> std::optional { + if (const auto i = config::find_throughput_control_group( + cfg.cgroups().cbegin(), cfg.cgroups().cend(), client_id); + i != cfg.cgroups().cend()) { + return std::distance(cfg.cgroups().cbegin(), i); + } + return std::nullopt; + }; + BOOST_TEST(get_match_index("client_id-1") == 0); + BOOST_TEST(get_match_index("clinet_id-2") == 1); + BOOST_TEST(get_match_index("") == 3); + BOOST_TEST(get_match_index(std::nullopt) == 4); + BOOST_TEST(get_match_index("nonclient_id") == 5); + // Failure cases // Control characters in names. In YAML, control characters [are not diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc index 24da000a5179..891b9186ff6b 100644 --- a/src/v/config/throughput_control_group.cc +++ b/src/v/config/throughput_control_group.cc @@ -13,6 +13,7 @@ #include "config/convert.h" #include "ssx/sformat.h" +#include "utils/functional.h" #include "utils/to_string.h" #include "utils/utf8.h" @@ -21,11 +22,14 @@ #include #include #include +#include #include #include #include #include +#include +#include using namespace std::string_literals; @@ -51,8 +55,42 @@ constexpr const char* tp_limit_node_in = "throughput_limit_node_in_bps"; constexpr const char* tp_limit_node_out = "throughput_limit_node_out_bps"; } // namespace ids +constexpr char selector_prefix = '+'; +constexpr std::string_view selector_empty = "empty"; + static const ss::sstring noname(1, '\0'); +struct copyable_RE2 : re2::RE2 { + copyable_RE2(const copyable_RE2& other) + : RE2(other.pattern()) {} + copyable_RE2& operator=(const copyable_RE2&) = delete; + copyable_RE2(copyable_RE2&&) noexcept = delete; + copyable_RE2& operator=(copyable_RE2&&) noexcept = delete; + explicit copyable_RE2(const std::string& s) + : RE2(s) {} + friend std::ostream& operator<<(std::ostream& os, const copyable_RE2& re) { + fmt::print(os, "{}", re.pattern()); + return os; + } +}; + +struct client_id_matcher_type { + // nullopt stands for the empty client_id value + std::optional v; + client_id_matcher_type() = default; + explicit client_id_matcher_type(const copyable_RE2& d) + : v(d) {} + friend std::ostream& operator<<( + std::ostream& os, const std::unique_ptr& mt) { + if (mt) { + fmt::print(os, "{{v: {}}}", mt->v); + } else { + fmt::print(os, "null"); + } + return os; + } +}; + throughput_control_group::throughput_control_group() = default; throughput_control_group::throughput_control_group( throughput_control_group&&) noexcept @@ -65,10 +103,10 @@ throughput_control_group::~throughput_control_group() noexcept = default; throughput_control_group::throughput_control_group( const throughput_control_group& other) : name(other.name) - , client_id_re( - other.client_id_re - ? std::make_unique(other.client_id_re->pattern()) - : std::unique_ptr{}) + , client_id_matcher( + other.client_id_matcher + ? std::make_unique(*other.client_id_matcher) + : std::unique_ptr{}) , throughput_limit_node_in_bps(other.throughput_limit_node_in_bps) , throughput_limit_node_out_bps(other.throughput_limit_node_out_bps) {} @@ -84,20 +122,28 @@ operator<<(std::ostream& os, const throughput_control_group& tcg) { "{{group_name: {}, client_id: {}, throughput_limit_node_in_bps: {}, " "throughput_limit_node_out_bps: {}}}", tcg.name, - tcg.client_id_re ? tcg.client_id_re->pattern() : ""s, + tcg.client_id_matcher, tcg.throughput_limit_node_in_bps, tcg.throughput_limit_node_out_bps); return os; } bool throughput_control_group::match_client_id( - const ss::sstring& client_id) const { - if (!client_id_re) { + const std::optional client_id) const { + if (!client_id_matcher) { // omitted match criterion means "always match" return true; } - return re2::RE2::FullMatch( - re2::StringPiece(client_id.c_str(), client_id.length()), *client_id_re); + if (!client_id_matcher->v) { + // empty client_id match + // only missing client_id matches the empty + return !client_id; + } + // regex match + // missing client_id never matches a re + return client_id + && re2::RE2::FullMatch( + re2::StringPiece(*client_id), *client_id_matcher->v); } bool throughput_control_group::is_noname() const noexcept { @@ -108,10 +154,21 @@ ss::sstring throughput_control_group::validate() const { if (unlikely(name != noname && contains_control_character(name))) { return "Group name contains invalid character"; } - if (unlikely(client_id_re && !client_id_re->ok())) { - return ss::format("Invalid client_id regex. {}", client_id_re->error()); + if (!client_id_matcher) { + // omitted value always ok + return {}; + } + if (!client_id_matcher->v) { + // empty match always ok + return {}; + } + // regex match: check if the regex is valid + if (likely(client_id_matcher->v->ok())) { + return {}; + } else { + return ss::format( + "Invalid client_id regex. {}", client_id_matcher->v->error()); } - return {}; } } // namespace config @@ -119,46 +176,79 @@ ss::sstring throughput_control_group::validate() const { namespace YAML { Node convert::encode(const type& tcg) { + using namespace config; Node node; - if (tcg.name != config::noname) { - node[config::ids::name] = tcg.name; + + if (tcg.name != noname) { + node[ids::name] = tcg.name; } - if (tcg.client_id_re) { - node[config::ids::client_id] = tcg.client_id_re->pattern(); + + if (tcg.client_id_matcher) { + YAML::Node client_id_node = node[ids::client_id]; + if (!tcg.client_id_matcher->v) { + // the empty match + client_id_node = fmt::format( + "{}{}", selector_prefix, selector_empty); + } else { + // regex + client_id_node = tcg.client_id_matcher->v->pattern(); + } } + return node; } bool convert::decode( const Node& node, type& tcg) { - config::throughput_control_group res; - if (const auto& n = node[config::ids::name]; n) { + using namespace config; + throughput_control_group res; + + if (const auto& n = node[ids::name]; n) { res.name = n.as(); if (contains_control_characters(res.name)) { return false; } } else { - res.name = config::noname; + res.name = noname; } - if (const auto& n = node[config::ids::client_id]; n) { + + if (const auto& n = node[ids::client_id]; n) { const auto s = n.as(); if (contains_control_characters(s)) { return false; } - res.client_id_re = std::make_unique(s); - if (!res.client_id_re->ok()) { - return false; + if (!s.empty() && s[0] == selector_prefix) { + // an explicit selector + const std::string_view selector_name( + std::next(s.cbegin()), s.cend()); + if (selector_name == selector_empty) { + res.client_id_matcher + = std::make_unique(); + } else { + return false; + } + } else { + // a regex + const copyable_RE2 re{s}; + if (!re.ok()) { + return false; + } + res.client_id_matcher = std::make_unique( + re); } } else { - res.client_id_re.reset(); + // nothing + res.client_id_matcher.reset(); } - if (const auto& n = node[config::ids::tp_limit_node_in]; n) { + + if (const auto& n = node[ids::tp_limit_node_in]; n) { // only the no-limit option is supported yet return false; } else { res.throughput_limit_node_in_bps = std::nullopt; } - if (const auto& n = node[config::ids::tp_limit_node_out]; n) { + + if (const auto& n = node[ids::tp_limit_node_out]; n) { // only the no-limit option is supported yet return false; } else { @@ -176,15 +266,25 @@ namespace json { void rjson_serialize( json::Writer& w, const config::throughput_control_group& tcg) { + using namespace config; w.StartObject(); - if (tcg.name != config::noname) { - w.Key(config::ids::name); + + if (tcg.name != noname) { + w.Key(ids::name); w.String(tcg.name); } - if (tcg.client_id_re) { - w.Key(config::ids::client_id); - w.String(tcg.client_id_re->pattern()); + + if (tcg.client_id_matcher) { + w.Key(ids::client_id); + if (!tcg.client_id_matcher->v) { + // empty match + w.String(fmt::format("{}{}", selector_prefix, selector_empty)); + } else { + // regex match + w.String(tcg.client_id_matcher->v->pattern()); + } } + w.EndObject(); } diff --git a/src/v/config/throughput_control_group.h b/src/v/config/throughput_control_group.h index 2d93eac0f164..cc9105931746 100644 --- a/src/v/config/throughput_control_group.h +++ b/src/v/config/throughput_control_group.h @@ -25,6 +25,7 @@ #include #include #include +#include namespace re2 { class RE2; @@ -47,12 +48,12 @@ struct throughput_control_group { friend std::ostream& operator<<(std::ostream& os, const throughput_control_group& tcg); - bool match_client_id(const ss::sstring& client_id) const; + bool match_client_id(std::optional client_id) const; bool is_noname() const noexcept; ss::sstring validate() const; ss::sstring name; - std::unique_ptr client_id_re; + std::unique_ptr client_id_matcher; // nuillopt means unlimited: std::optional throughput_limit_node_in_bps; std::optional throughput_limit_node_out_bps; @@ -66,7 +67,9 @@ detail::property_type_name() { template InputIt find_throughput_control_group( - const InputIt first, const InputIt last, const ss::sstring& client_id) { + const InputIt first, + const InputIt last, + const std::optional client_id) { return std::find_if( first, last, [&client_id](const config::throughput_control_group& cg) { return cg.match_client_id(client_id); From 7af5c60a2836e65233c433c9e068faf7c1ea302c Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 02:12:38 -0400 Subject: [PATCH 5/9] config: "kafka_throughput_control" cluster property also a typo fixed in another description --- src/v/config/configuration.cc | 20 +++++++++++++++++++- src/v/config/configuration.h | 2 ++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 17f209180aa3..afa4759d36fd 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2038,9 +2038,27 @@ configuration::configuration() *this, "kafka_throughput_controlled_api_keys", "List of Kafka API keys that are subject to cluster-wide " - "and node-wide thoughput limit control", + "and node-wide throughput limit control", {.needs_restart = needs_restart::no, .visibility = visibility::user}, {"produce", "fetch"}) + , kafka_throughput_control( + *this, + "kafka_throughput_control", + "List of throughput control groups that define exclusions from node-wide " + "throughput limits. Each group consists of: (\"name\" (optional) - any " + "unique group name, \"client_id\" - regex to match client_id). " + "A connection is assigned the first matching group, then the connection " + "is excluded from throughput control.", + { + .needs_restart = needs_restart::no, + .example + = R"([{'name': 'first_group','client_id': 'client1'}, {'client_id': 'consumer-\d+'}, {'name': 'catch all'}])", + .visibility = visibility::user, + }, + {}, + [](auto& v) { + return validate_throughput_control_groups(v.cbegin(), v.cend()); + }) , node_isolation_heartbeat_timeout( *this, "node_isolation_heartbeat_timeout", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index af88b0a271cc..87d2431bb94e 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -18,6 +18,7 @@ #include "config/data_directory_path.h" #include "config/endpoint_tls_config.h" #include "config/property.h" +#include "config/throughput_control_group.h" #include "config/tls_config.h" #include "model/compression.h" #include "model/fundamental.h" @@ -409,6 +410,7 @@ struct configuration final : public config_store { property kafka_quota_balancer_min_shard_throughput_ratio; bounded_property kafka_quota_balancer_min_shard_throughput_bps; property> kafka_throughput_controlled_api_keys; + property> kafka_throughput_control; bounded_property node_isolation_heartbeat_timeout; From 2ded90fd6d1ed1b7f05a07c7353a190684386844 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 02:17:16 -0400 Subject: [PATCH 6/9] k/quotas: binding for "kafka_throughput_control" --- src/v/kafka/server/snc_quota_manager.cc | 1 + src/v/kafka/server/snc_quota_manager.h | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 56427c685769..6a61c1a55638 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -148,6 +148,7 @@ snc_quota_manager::snc_quota_manager() , _kafka_quota_balancer_min_shard_throughput_bps( config::shard_local_cfg() .kafka_quota_balancer_min_shard_throughput_bps.bind()) + , _kafka_throughput_control(config::shard_local_cfg().kafka_throughput_control.bind()) , _node_quota_default{calc_node_quota_default()} , _shard_quota{ .in {node_to_shard_quota(_node_quota_default.in), diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index 2e0f236f8244..ac2f9bb08e65 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -11,6 +11,7 @@ #pragma once #include "config/property.h" +#include "config/throughput_control_group.h" #include "seastarx.h" #include "utils/bottomless_token_bucket.h" #include "utils/mutex.h" @@ -171,6 +172,8 @@ class snc_quota_manager _kafka_quota_balancer_node_period; config::binding _kafka_quota_balancer_min_shard_throughput_ratio; config::binding _kafka_quota_balancer_min_shard_throughput_bps; + config::binding> + _kafka_throughput_control; // operational, only used in the balancer shard ss::timer _balancer_timer; From cd9b9518bdf395e695fbedba52bca74e6b523b0e Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 02:30:55 -0400 Subject: [PATCH 7/9] k/quotas: use "kafka_throughput_control" for tput exemptions connection_context now has all quota related stuff for the connection stored in `_snc_quota_context`. This object is supposed to be created once per connection lifetime by `snc_quota_manager`, but it will be recreated each time a client_id changes on the connection. When the quota context is created (lazily on the connection context), the `kafka_throughput_control` rules are used to select the matching throughput control group. If any group is matched, the context saves it as a flag to exempt the connection from any snc_quota_manager control. This will change into a full association with the control group. Currently the exempt flag simply tells the quota manager to skip any messages in that context. --- src/v/kafka/server/connection_context.cc | 50 +++++++++++++-- src/v/kafka/server/connection_context.h | 19 ++---- src/v/kafka/server/fwd.h | 1 + src/v/kafka/server/snc_quota_manager.cc | 78 +++++++++++++++++++++--- src/v/kafka/server/snc_quota_manager.h | 55 ++++++++++++++--- 5 files changed, 166 insertions(+), 37 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 76e779716eb7..8dda174d91d9 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -24,6 +24,7 @@ #include "kafka/server/response.h" #include "kafka/server/server.h" #include "kafka/server/snc_quota_manager.h" +#include "likely.h" #include "net/exceptions.h" #include "security/exceptions.h" #include "units.h" @@ -43,6 +44,29 @@ using namespace std::chrono_literals; namespace kafka { +connection_context::connection_context( + class server& s, + ss::lw_shared_ptr conn, + std::optional sasl, + bool enable_authorizer, + std::optional mtls_state, + config::binding max_request_size, + config::conversion_binding, std::vector> + kafka_throughput_controlled_api_keys) noexcept + : _server(s) + , conn(conn) + , _sasl(std::move(sasl)) + // tests may build a context without a live connection + , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) + , _enable_authorizer(enable_authorizer) + , _authlog(_client_addr, client_port()) + , _mtls_state(std::move(mtls_state)) + , _max_request_size(std::move(max_request_size)) + , _kafka_throughput_controlled_api_keys( + std::move(kafka_throughput_controlled_api_keys)) {} + +connection_context::~connection_context() noexcept = default; + ss::future<> connection_context::process() { while (true) { if (is_finished_parsing()) { @@ -215,9 +239,12 @@ connection_context::record_tp_and_calculate_throttle( // Throttle on shard wide quotas snc_quota_manager::delays_t shard_delays; if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { - _server.snc_quota_mgr().record_request_receive(request_size, now); + _server.snc_quota_mgr().get_or_create_quota_context( + _snc_quota_context, hdr.client_id); + _server.snc_quota_mgr().record_request_receive( + *_snc_quota_context, request_size, now); shard_delays = _server.snc_quota_mgr().get_shard_delays( - _throttled_until, now); + *_snc_quota_context, now); } // Sum up @@ -335,7 +362,18 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return ss::make_ready_future<>(); } if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { - _server.snc_quota_mgr().record_request_intake(size); + // Normally we can only get here after a prior call to + // snc_quota_mgr().get_or_create_quota_context() in + // record_tp_and_calculate_throttle(), but there is possibility + // that the changing configuration could still take us into this + // branch with unmatching (and even null) _snc_quota_context. + // Simply an unmatching _snc_quota_context is no big deal because + // it is a one off event, but we need protection from it being + // nullptr + if (likely(_snc_quota_context)) { + _server.snc_quota_mgr().record_request_intake( + *_snc_quota_context, size); + } } auto sres = ss::make_lw_shared(std::move(sres_in)); @@ -526,7 +564,11 @@ ss::future<> connection_context::maybe_process_responses() { auto response_size = msg.size(); auto request_key = resp_and_res.resources->request_data.request_key; if (_kafka_throughput_controlled_api_keys().at(request_key)) { - _server.snc_quota_mgr().record_response(response_size); + // see the comment in dispatch_method_once() + if (likely(_snc_quota_context)) { + _server.snc_quota_mgr().record_response( + *_snc_quota_context, response_size); + } } _server.handler_probe(request_key).add_bytes_sent(response_size); try { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 0d55c9778761..c7d3fd508e22 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -117,20 +117,9 @@ class connection_context final std::optional mtls_state, config::binding max_request_size, config::conversion_binding, std::vector> - kafka_throughput_controlled_api_keys) noexcept - : _server(s) - , conn(conn) - , _sasl(std::move(sasl)) - // tests may build a context without a live connection - , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) - , _enable_authorizer(enable_authorizer) - , _authlog(_client_addr, client_port()) - , _mtls_state(std::move(mtls_state)) - , _max_request_size(std::move(max_request_size)) - , _kafka_throughput_controlled_api_keys( - std::move(kafka_throughput_controlled_api_keys)) {} - - ~connection_context() noexcept = default; + kafka_throughput_controlled_api_keys) noexcept; + ~connection_context() noexcept; + connection_context(const connection_context&) = delete; connection_context(connection_context&&) = delete; connection_context& operator=(const connection_context&) = delete; @@ -352,9 +341,9 @@ class connection_context final ctx_log _authlog; std::optional _mtls_state; config::binding _max_request_size; - ss::lowres_clock::time_point _throttled_until; config::conversion_binding, std::vector> _kafka_throughput_controlled_api_keys; + std::unique_ptr _snc_quota_context; }; } // namespace kafka diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index f5eff27ceb81..a3a06ce0a605 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -25,5 +25,6 @@ class request_context; class rm_group_frontend; class rm_group_proxy_impl; class usage_manager; +class snc_quota_context; } // namespace kafka diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 6a61c1a55638..1280a7f9aa18 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -19,6 +19,8 @@ #include #include +#include +#include #include using namespace std::chrono_literals; @@ -248,14 +250,61 @@ snc_quota_manager::calc_node_quota_default() const { return default_quota; } +void snc_quota_manager::get_or_create_quota_context( + std::unique_ptr& ctx, + std::optional client_id) { + if (likely(ctx)) { + // NB: comparing string_view to sstring might be suboptimal + if (likely(ctx->_client_id == client_id)) { + // the context is the right one + return; + } + + // either of the context indexing propeties have changed on the client + // within the same connection. This is an unexpected path, quotas may + // misbehave if we ever get here. The design is based on assumption that + // this should not happen. If it does happen with a supported client, we + // probably should start supporting multiple quota contexts per + // connection + vlog( + klog.warn, + "qm - client_id has changed on the connection. Quotas are reset now. " + "Old client_id: {}, new client_id: {}", + ctx->_client_id, + client_id); + } + + ctx = std::make_unique(client_id); + const auto tcgroup_it = config::find_throughput_control_group( + _kafka_throughput_control().cbegin(), + _kafka_throughput_control().cend(), + client_id); + if (tcgroup_it == _kafka_throughput_control().cend()) { + ctx->_exempt = false; + vlog(klog.debug, "qm - No throughput control group assigned"); + } else { + ctx->_exempt = true; + if (tcgroup_it->is_noname()) { + vlog( + klog.debug, + "qm - Assigned throughput control group #{}", + std::distance(_kafka_throughput_control().cbegin(), tcgroup_it)); + } else { + vlog( + klog.debug, + "qm - Assigned throughput control group: {}", + tcgroup_it->name); + } + } +} + snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( - clock::time_point& connection_throttle_until, - const clock::time_point now) const { + snc_quota_context& ctx, const clock::time_point now) const { delays_t res; // force throttle whatever the client did not do on its side - if (now < connection_throttle_until) { - res.enforce = connection_throttle_until - now; + if (now < ctx._throttled_until) { + res.enforce = ctx._throttled_until - now; } // throttling delay the connection should be requested to throttle @@ -263,23 +312,36 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( res.request = std::min( _max_kafka_throttle_delay(), std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); - connection_throttle_until = now + res.request; + ctx._throttled_until = now + res.request; return res; } void snc_quota_manager::record_request_receive( - const size_t request_size, const clock::time_point now) noexcept { + snc_quota_context& ctx, + const size_t request_size, + const clock::time_point now) noexcept { + if (ctx._exempt) { + return; + } _shard_quota.in.use(request_size, now); } void snc_quota_manager::record_request_intake( - const size_t request_size) noexcept { + snc_quota_context& ctx, const size_t request_size) noexcept { + if (ctx._exempt) { + return; + } _probe.rec_traffic_in(request_size); } void snc_quota_manager::record_response( - const size_t request_size, const clock::time_point now) noexcept { + snc_quota_context& ctx, + const size_t request_size, + const clock::time_point now) noexcept { + if (ctx._exempt) { + return; + } _shard_quota.eg.use(request_size, now); } diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index ac2f9bb08e65..1f55478b4afc 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -19,10 +19,12 @@ #include #include #include +#include #include #include #include +#include namespace kafka { @@ -58,6 +60,29 @@ class snc_quotas_probe { size_t _traffic_in = 0; }; +class snc_quota_context { +public: + explicit snc_quota_context(std::optional client_id) + : _client_id(client_id) {} + +private: + friend class snc_quota_manager; + + // Indexing + std::optional _client_id; + + // Configuration + + /// Whether the connection belongs to an exempt tput control group + bool _exempt{false}; + + // Operating + + /// What time the client on this conection should throttle (be throttled) + /// until + ss::lowres_clock::time_point _throttled_until; +}; + /// Isolates \ref quota_manager functionality related to /// shard/node/cluster (SNC) wide quotas and limits class snc_quota_manager @@ -83,28 +108,38 @@ class snc_quota_manager clock::duration request{0}; }; + /// Depending on the other arguments, create or actualize or keep the + /// existing \p ctx. The context object is supposed to be stored + /// in the connection context, and created only once per connection + /// lifetime. However since the kafka API allows changing client_id of a + /// connection on the fly, we may need to replace the existing context with + /// a new one if that happens (actualize). + /// \post (bool)ctx == true + void get_or_create_quota_context( + std::unique_ptr& ctx, + std::optional client_id); + /// Determine throttling required by shard level TP quotas. - /// @param connection_throttle_until (in,out) until what time the client - /// on this conection should throttle until. If it does not, this throttling - /// will be enforced on the next call. In: value from the last call, out: - /// value saved until the next call. - delays_t get_shard_delays( - clock::time_point& connection_throttle_until, - clock::time_point now) const; + delays_t get_shard_delays(snc_quota_context&, clock::time_point now) const; /// Record the request size when it has arrived from the transport. /// This should be done before calling \ref get_shard_delays because the /// recorded request size is used to calculate throttling parameters. void record_request_receive( - size_t request_size, clock::time_point now = clock::now()) noexcept; + snc_quota_context&, + size_t request_size, + clock::time_point now = clock::now()) noexcept; /// Record the request size when the request data is about to be consumed. /// This data is used to represent throttled throughput. - void record_request_intake(size_t request_size) noexcept; + void + record_request_intake(snc_quota_context&, size_t request_size) noexcept; /// Record the response size for all purposes void record_response( - size_t request_size, clock::time_point now = clock::now()) noexcept; + snc_quota_context&, + size_t request_size, + clock::time_point now = clock::now()) noexcept; /// Metrics probe object const snc_quotas_probe& get_snc_quotas_probe() const noexcept { From edd1f11ebd671b97b4d36f3c35e159577433a3b5 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Thu, 18 May 2023 18:47:30 -0400 Subject: [PATCH 8/9] tests: enhance error message in `test_valid_settings` --- tests/rptest/tests/cluster_config_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/tests/cluster_config_test.py b/tests/rptest/tests/cluster_config_test.py index e505e7105c2b..20148fd7de08 100644 --- a/tests/rptest/tests/cluster_config_test.py +++ b/tests/rptest/tests/cluster_config_test.py @@ -589,7 +589,7 @@ def test_valid_settings(self): elif p['type'] == "array" and p['items']['type'] == 'string': valid_value = ["custard", "cream"] else: - raise NotImplementedError(p['type']) + raise NotImplementedError(f"{p['type']} in {name}") if name == 'sasl_mechanisms': # The default value is ['SCRAM'], but the array cannot contain From 529cbf39c14e32fca87352063e951f8f39cc30b5 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Fri, 26 May 2023 02:53:23 -0400 Subject: [PATCH 9/9] k/quotas: add kafka_throughput_control to test_configurations Set some possible values for `kafka_throughput_control` in the configurations test --- .../tests/throughput_limits_snc_test.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/tests/rptest/tests/throughput_limits_snc_test.py b/tests/rptest/tests/throughput_limits_snc_test.py index bc93161f076e..83dfce7673de 100644 --- a/tests/rptest/tests/throughput_limits_snc_test.py +++ b/tests/rptest/tests/throughput_limits_snc_test.py @@ -7,9 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -import math -import random -import time +import random, time, math, json, string from enum import Enum from typing import Tuple @@ -65,6 +63,7 @@ class ConfigProp(Enum): QUOTA_SHARD_MIN_RATIO = "kafka_quota_balancer_min_shard_throughput_ratio" QUOTA_SHARD_MIN_BPS = "kafka_quota_balancer_min_shard_throughput_bps" CONTROLLED_API_KEYS = "kafka_throughput_controlled_api_keys" + THROUGHPUT_CONTROL = "kafka_throughput_control" api_names = [ "add_offsets_to_txn", "add_partitions_to_txn", "alter_configs", @@ -143,13 +142,31 @@ def get_config_parameter_random_value(self, prop: ConfigProp): keys.add(self.api_names[self.rnd.randrange(keys_num)]) return list(keys) + if prop == self.ConfigProp.THROUGHPUT_CONTROL: + throughput_control = [] + letters = string.digits + string.ascii_letters + ' ' + for _ in range(self.rnd.randrange(4)): + tc_item = {} + r = self.rnd.randrange(3) + if r != 0: + tc_item['name'] = ''.join( + self.rnd.choice(letters) + for _ in range(self.binexp_random(0, 512))) + r = self.rnd.randrange(3) + if r == 0: + tc_item['client_id'] = 'client_id 1' + elif r == 2: + tc_item['client_id'] = 'client_\d+' + throughput_control.append(tc_item) + return throughput_control + raise Exception(f"Unsupported ConfigProp: {prop}") def current_effective_node_quota(self) -> Tuple[int, int]: metrics = self.redpanda.metrics_sample( "quotas_quota_effective", metrics_endpoint=MetricsEndpoint.METRICS) - assert metrics, "Effecive quota metric is missing" + assert metrics, "Effective quota metric is missing" self.logger.debug(f"Samples: {metrics.samples}") node_quota_in = sum(