From f75baff9228a13ada531f6d0598567ecf6bf935b Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 11:46:51 -0400 Subject: [PATCH 01/12] tests/clients: add RpkTool.acl_create_allow_topic Add a function to RpkTool to create an allow topic ACL for a user. A reusable part of acl_create_allow_cluster refactored. --- tests/rptest/clients/rpk.py | 47 ++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 19937826703a..b993eb558e36 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -14,7 +14,7 @@ import time import itertools from collections import namedtuple -from typing import Optional +from typing import List, Optional from ducktape.cluster.cluster import ClusterNode from rptest.util import wait_until_result from rptest.services import tls @@ -993,20 +993,9 @@ def acl_list(self): return output - def acl_create_allow_cluster(self, username, op): - """ - Add allow+describe+cluster ACL - """ - cmd = [ - self._rpk_binary(), - "acl", - "create", - "--allow-principal", - f"User:{username}", - "--operation", - op, - "--cluster", - ] + self._kafka_conn_settings() + def _acl_create(self, params: List[str]) -> str: + cmd = [self._rpk_binary(), "acl", "create" + ] + params + self._kafka_conn_settings() output = self._execute(cmd) table = parse_rpk_table(output) expected_columns = [ @@ -1024,10 +1013,36 @@ def acl_create_allow_cluster(self, username, op): if table.rows[0][-1] != "": raise RpkException( - f"acl_create_allow_cluster failed with {table.rows[0][-1]}") + f"_acl_create({params}) failed with {table.rows[0][-1]}") return output + def acl_create_allow_cluster(self, username: str, op: str) -> str: + """ + Add allow+cluster ACL + """ + return self._acl_create([ + "--allow-principal", + f"User:{username}", + "--operation", + op, + "--cluster", + ]) + + def acl_create_allow_topic(self, username: str, topic: str, + op: str) -> str: + """ + Add an allow+topic ACL + """ + return self._acl_create([ + "--allow-principal", + f"User:{username}", + "--operation", + op, + "--topic", + topic, + ]) + def cluster_metadata_id(self): """ Calls 'cluster metadata' and returns the cluster ID, if set, From 36566e9cf7af4e7c05ae7fa503ad3ab67546c4ac Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 11:55:16 -0400 Subject: [PATCH 02/12] tests/services: authentication in RpkProducer and more Support --username/--password authentication options Add a function to check if the producer is running Log service name with progress messages so that in case of several producers running at the same time, it is possible to understand who is doing what --- tests/rptest/services/rpk_producer.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/rptest/services/rpk_producer.py b/tests/rptest/services/rpk_producer.py index bad9861550ed..7b51525e7d93 100644 --- a/tests/rptest/services/rpk_producer.py +++ b/tests/rptest/services/rpk_producer.py @@ -22,7 +22,9 @@ def __init__(self, produce_timeout: Optional[int] = None, *, partition: Optional[int] = None, - max_message_bytes: Optional[int] = None): + max_message_bytes: Optional[int] = None, + user: Optional[str] = None, + password: Optional[str] = None): super(RpkProducer, self).__init__(context, num_nodes=1) self._redpanda = redpanda self._topic = topic @@ -35,6 +37,8 @@ def __init__(self, self._output_line_count = 0 self._partition = partition self._max_message_bytes = max_message_bytes + self._user = user + self._password = password if produce_timeout is None: produce_timeout = 10 @@ -68,11 +72,17 @@ def _worker(self, _idx, node): if self._max_message_bytes is not None: cmd += f" --max-message-bytes {self._max_message_bytes}" + if self._user is not None: + cmd += f" --user '{self._user}'" + + if self._password is not None: + cmd += f" --password '{self._password}'" + self._stopping.clear() try: for line in node.account.ssh_capture( cmd, timeout_sec=self._produce_timeout): - self.logger.debug(line.rstrip()) + self.logger.debug(f"{self.service_id}: {line.rstrip()}") self._output_line_count += 1 except RemoteCommandError: if self._stopping.is_set(): @@ -90,3 +100,7 @@ def output_line_count(self): def stop_node(self, node): self._stopping.set() node.account.kill_process("rpk", clean_shutdown=False) + + def is_running(self) -> bool: + return any(worker.is_alive() + for worker in self.worker_threads.values()) From 5681b88709bb9acbe82a29f649eebf04f6728c3c Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 19 Jun 2023 15:10:54 -0400 Subject: [PATCH 03/12] k/quotas: little things typos, renames, comments --- .../tests/throughput_control_group_test.cc | 4 ++-- src/v/config/throughput_control_group.cc | 19 ++++++++++++------- src/v/kafka/server/connection_context.cc | 4 ++-- src/v/kafka/server/snc_quota_manager.cc | 12 +++++++----- src/v/kafka/server/snc_quota_manager.h | 4 ++-- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc index 729ef71eb771..0f62bda82494 100644 --- a/src/v/config/tests/throughput_control_group_test.cc +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -44,7 +44,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { - name: "" client_id: client_id-1 - client_id: cli.+_id-\d+ - - client_id: another unnamed group indended to verify this passes validation + - client_id: another unnamed group intended to verify this passes validation - name: match-nothing-group client_id: "" - name: unspecified client_id @@ -113,7 +113,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { !cfg.read_yaml(YAML::Load(R"(cgroups: [{name: n, client_id: "*"}])"s)) .empty()); - // Specify any throupghput limit + // Specify any throughput limit BOOST_TEST( !cfg .read_yaml(YAML::Load( diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc index f348bf9642b3..0993f7750556 100644 --- a/src/v/config/throughput_control_group.cc +++ b/src/v/config/throughput_control_group.cc @@ -68,6 +68,7 @@ struct copyable_RE2 : re2::RE2 { copyable_RE2& operator=(copyable_RE2&&) noexcept = delete; explicit copyable_RE2(const std::string& s) : RE2(s) {} + friend std::ostream& operator<<(std::ostream& os, const copyable_RE2& re) { fmt::print(os, "{}", re.pattern()); return os; @@ -77,9 +78,11 @@ struct copyable_RE2 : re2::RE2 { struct client_id_matcher_type { // nullopt stands for the empty client_id value std::optional v; + client_id_matcher_type() = default; explicit client_id_matcher_type(const copyable_RE2& d) : v(d) {} + friend std::ostream& operator<<( std::ostream& os, const std::unique_ptr& mt) { if (mt) { @@ -119,8 +122,8 @@ std::ostream& operator<<(std::ostream& os, const throughput_control_group& tcg) { fmt::print( os, - "{{group_name: {}, client_id: {}, throughput_limit_node_in_bps: {}, " - "throughput_limit_node_out_bps: {}}}", + "{{group_name: {}, client_id_matcher: {}, " + "throughput_limit_node_in_bps: {}, throughput_limit_node_out_bps: {}}}", tcg.is_noname() ? ""s : fmt::format("{{{}}}", tcg.name), tcg.client_id_matcher, tcg.throughput_limit_node_in_bps, @@ -129,7 +132,7 @@ operator<<(std::ostream& os, const throughput_control_group& tcg) { } bool throughput_control_group::match_client_id( - const std::optional client_id) const { + const std::optional client_id_to_match) const { if (!client_id_matcher) { // omitted match criterion means "always match" return true; @@ -137,13 +140,13 @@ bool throughput_control_group::match_client_id( if (!client_id_matcher->v) { // empty client_id match // only missing client_id matches the empty - return !client_id; + return !client_id_to_match; } // regex match // missing client_id never matches a re - return client_id + return client_id_to_match && re2::RE2::FullMatch( - re2::StringPiece(*client_id), *client_id_matcher->v); + re2::StringPiece(*client_id_to_match), *client_id_matcher->v); } bool throughput_control_group::is_noname() const noexcept { @@ -203,6 +206,7 @@ bool convert::decode( using namespace config; throughput_control_group res; + // name if (const auto& n = node[ids::name]; n) { res.name = n.as(); if (contains_control_characters(res.name)) { @@ -212,6 +216,7 @@ bool convert::decode( res.name = noname; } + // client_id if (const auto& n = node[ids::client_id]; n) { const auto s = n.as(); if (contains_control_characters(s)) { @@ -241,13 +246,13 @@ bool convert::decode( res.client_id_matcher.reset(); } + // tp_limit_node_in/out if (const auto& n = node[ids::tp_limit_node_in]; n) { // only the no-limit option is supported yet return false; } else { res.throughput_limit_node_in_bps = std::nullopt; } - if (const auto& n = node[ids::tp_limit_node_out]; n) { // only the no-limit option is supported yet return false; diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 8dda174d91d9..884bacf932d6 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -366,8 +366,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { // snc_quota_mgr().get_or_create_quota_context() in // record_tp_and_calculate_throttle(), but there is possibility // that the changing configuration could still take us into this - // branch with unmatching (and even null) _snc_quota_context. - // Simply an unmatching _snc_quota_context is no big deal because + // branch with nonmatching (and even null) _snc_quota_context. + // Simply a nonmatching _snc_quota_context is no big deal because // it is a one off event, but we need protection from it being // nullptr if (likely(_snc_quota_context)) { diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 1280a7f9aa18..d634f95efe45 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -254,13 +254,15 @@ void snc_quota_manager::get_or_create_quota_context( std::unique_ptr& ctx, std::optional client_id) { if (likely(ctx)) { - // NB: comparing string_view to sstring might be suboptimal + // note: comparing sstring (the lefthand _client_id) to string_view + // (the righthand client_id) is only possible by converting the former + // to string_view, or with the sstring::operator<=>, => no perf penalty if (likely(ctx->_client_id == client_id)) { // the context is the right one return; } - // either of the context indexing propeties have changed on the client + // either of the context indexing properties have changed on the client // within the same connection. This is an unexpected path, quotas may // misbehave if we ever get here. The design is based on assumption that // this should not happen. If it does happen with a supported client, we @@ -525,7 +527,7 @@ namespace detail { /// Split \p value between the elements of vector \p target in full, adding them /// to the elements already in the vector. /// If \p value is not a multiple of target.size(), the entire \p value -/// is still dispensed in full. Quotinents rounded towards infinity +/// is still dispensed in full. Quotients rounded towards infinity /// are added to the front elements of the \p target, and those rounded /// towards zero are added to the back elements. void dispense_equally(std::vector& target, const quota_t value) { @@ -636,7 +638,7 @@ void dispense_negative_deltas( if (unlikely(quotas_left == 0)) { vlog( klog.error, - "qb - No shards to distribute the remianing delta: {}", + "qb - No shards to distribute the remaining delta: {}", delta); return; } @@ -753,7 +755,7 @@ ss::future<> snc_quota_manager::quota_balancer_update( dispense_negative_deltas( schedule.eg, deltas.eg, std::move(quotas_soa.eg)); } - // postive deltas are disensed equally + // positive deltas are dispensed equally if (deltas.in > 0) { dispense_equally(schedule.in, deltas.in); } diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index 1f55478b4afc..b93716b9fedd 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -78,7 +78,7 @@ class snc_quota_context { // Operating - /// What time the client on this conection should throttle (be throttled) + /// What time the client on this connection should throttle (be throttled) /// until ss::lowres_clock::time_point _throttled_until; }; @@ -171,7 +171,7 @@ class snc_quota_manager ss::future<> quota_balancer_step(); /// A step of balancer that applies any updates from configuration changes. - /// Spawned by configration bindings watching changes of the properties. + /// Spawned by configuration bindings watching changes of the properties. /// Runs on the balancer shard only. ss::future<> quota_balancer_update( ingress_egress_state> old_node_quota_default, From b70b7a6b5e68de38259040480f310733b6b91f96 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 19 Jun 2023 15:22:23 -0400 Subject: [PATCH 04/12] k/quotas: improve logging when throughput group is matched, log the client address --- src/v/kafka/server/connection_context.cc | 4 +-- src/v/kafka/server/snc_quota_manager.cc | 40 ++++++++++++++++++------ src/v/kafka/server/snc_quota_manager.h | 5 ++- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 884bacf932d6..d3d064f88f57 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -240,7 +240,7 @@ connection_context::record_tp_and_calculate_throttle( snc_quota_manager::delays_t shard_delays; if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { _server.snc_quota_mgr().get_or_create_quota_context( - _snc_quota_context, hdr.client_id); + _snc_quota_context, hdr.client_id, _client_addr, client_port()); _server.snc_quota_mgr().record_request_receive( *_snc_quota_context, request_size, now); shard_delays = _server.snc_quota_mgr().get_shard_delays( @@ -259,7 +259,7 @@ connection_context::record_tp_and_calculate_throttle( || delay_request != clock::duration::zero()) { vlog( klog.trace, - "[{}:{}] throttle request:{{snc:{}, client:{}}}, " + "{}:{} throttle request:{{snc:{}, client:{}}}, " "enforce:{{snc:{}, client:{}}}, key:{}, request_size:{}", _client_addr, client_port(), diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index d634f95efe45..35222fea061f 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -252,7 +252,9 @@ snc_quota_manager::calc_node_quota_default() const { void snc_quota_manager::get_or_create_quota_context( std::unique_ptr& ctx, - std::optional client_id) { + std::optional client_id, + const ss::net::inet_address& client_addr, + const uint16_t client_port) { if (likely(ctx)) { // note: comparing sstring (the lefthand _client_id) to string_view // (the righthand client_id) is only possible by converting the former @@ -268,33 +270,51 @@ void snc_quota_manager::get_or_create_quota_context( // this should not happen. If it does happen with a supported client, we // probably should start supporting multiple quota contexts per // connection - vlog( - klog.warn, - "qm - client_id has changed on the connection. Quotas are reset now. " - "Old client_id: {}, new client_id: {}", - ctx->_client_id, - client_id); + if (ctx->_client_id != client_id) { + vlog( + klog.warn, + "{}:{} - qm - client_id has changed on the connection. " + "Quotas are reset now. Old client_id: {}, new client_id: {}", + client_addr, + client_port, + ctx->_client_id, + client_id); + } } ctx = std::make_unique(client_id); + vlog( + klog.trace, + "{}:{} - qm - Matching client_id: {}", + client_addr, + client_port, + ctx->_client_id); const auto tcgroup_it = config::find_throughput_control_group( _kafka_throughput_control().cbegin(), _kafka_throughput_control().cend(), client_id); if (tcgroup_it == _kafka_throughput_control().cend()) { ctx->_exempt = false; - vlog(klog.debug, "qm - No throughput control group assigned"); + vlog( + klog.debug, + "{}:{} - qm - No throughput control group assigned", + client_addr, + client_port); } else { ctx->_exempt = true; if (tcgroup_it->is_noname()) { vlog( klog.debug, - "qm - Assigned throughput control group #{}", + "{}:{} - qm - Assigned throughput control group #{}", + client_addr, + client_port, std::distance(_kafka_throughput_control().cbegin(), tcgroup_it)); } else { vlog( klog.debug, - "qm - Assigned throughput control group: {}", + "{}:{} - qm - Assigned throughput control group: '{}'", + client_addr, + client_port, tcgroup_it->name); } } diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index b93716b9fedd..37ae468b13e8 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -117,7 +118,9 @@ class snc_quota_manager /// \post (bool)ctx == true void get_or_create_quota_context( std::unique_ptr& ctx, - std::optional client_id); + std::optional client_id, + const ss::net::inet_address& client_addr, + uint16_t client_port); /// Determine throttling required by shard level TP quotas. delays_t get_shard_delays(snc_quota_context&, clock::time_point now) const; From 2ed51246b07bb833550dbe5f799b06776869d156 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 14:01:05 -0400 Subject: [PATCH 05/12] k/quotas: fixed equality operator for tput_ctrl_group +UT Becasue of the unique_ptr involved, the default equality operator does not work correctly here. --- .../tests/throughput_control_group_test.cc | 24 +++++++++++++++++-- src/v/config/throughput_control_group.cc | 16 +++++++++++++ src/v/config/throughput_control_group.h | 5 ++-- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc index 0f62bda82494..ee1787dfb701 100644 --- a/src/v/config/tests/throughput_control_group_test.cc +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -36,7 +36,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { struct test_config : public config::config_store { config::property> cgroups; test_config() - : cgroups(*this, "cgroups", "") {} + : cgroups(*this, "cgroups", "", {.needs_restart = config::needs_restart::no}) {} }; auto cfg_node = YAML::Load(R"( @@ -58,7 +58,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { BOOST_TEST( YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false})) == YAML::Dump(cfg_node)); - BOOST_TEST(cfg.cgroups().size() == 6); + BOOST_REQUIRE(cfg.cgroups().size() == 6); for (auto& cg : cfg.cgroups()) { BOOST_TEST(!cg.throughput_limit_node_in_bps); BOOST_TEST(!cg.throughput_limit_node_out_bps); @@ -72,6 +72,15 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { BOOST_TEST(!validate_throughput_control_groups( cfg.cgroups().cbegin(), cfg.cgroups().cend())); + // Equality + for (size_t k = 0; k != cfg.cgroups().size(); ++k) { + for (size_t l = 0; l != cfg.cgroups().size(); ++l) { + BOOST_TEST( + (cfg.cgroups()[k] == cfg.cgroups()[l]) == (k == l), + "k=" << k << " l=" << l); + } + } + // Matches const auto get_match_index = [&cfg]( @@ -89,6 +98,17 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { BOOST_TEST(get_match_index(std::nullopt) == 4); BOOST_TEST(get_match_index("nonclient_id") == 5); + // Copying + config::throughput_control_group p4 = cfg.cgroups()[0]; + BOOST_TEST(p4 == cfg.cgroups()[0]); + BOOST_TEST(fmt::format("{}", p4) == fmt::format("{}", cfg.cgroups()[0])); + + // Binding a property of + auto binding = cfg.cgroups.bind(); + BOOST_TEST(binding() == cfg.cgroups()); + BOOST_TEST( + fmt::format("{}", binding()) == fmt::format("{}", cfg.cgroups())); + // Failure cases // Control characters in names. In YAML, control characters [are not diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc index 0993f7750556..37326b01426b 100644 --- a/src/v/config/throughput_control_group.cc +++ b/src/v/config/throughput_control_group.cc @@ -69,6 +69,9 @@ struct copyable_RE2 : re2::RE2 { explicit copyable_RE2(const std::string& s) : RE2(s) {} + friend bool operator==(const copyable_RE2& lhs, const copyable_RE2& rhs) { + return lhs.pattern() == rhs.pattern(); + } friend std::ostream& operator<<(std::ostream& os, const copyable_RE2& re) { fmt::print(os, "{}", re.pattern()); return os; @@ -83,6 +86,9 @@ struct client_id_matcher_type { explicit client_id_matcher_type(const copyable_RE2& d) : v(d) {} + friend bool + operator==(const client_id_matcher_type&, const client_id_matcher_type&) + = default; friend std::ostream& operator<<( std::ostream& os, const std::unique_ptr& mt) { if (mt) { @@ -118,6 +124,16 @@ throughput_control_group::operator=(const throughput_control_group& other) { return *this = throughput_control_group(other); } +bool operator==( + const throughput_control_group& lhs, const throughput_control_group& rhs) { + return lhs.name == rhs.name + && (lhs.client_id_matcher == rhs.client_id_matcher + || (lhs.client_id_matcher && rhs.client_id_matcher + && *lhs.client_id_matcher == *rhs.client_id_matcher)) + && lhs.throughput_limit_node_in_bps == rhs.throughput_limit_node_in_bps + && lhs.throughput_limit_node_out_bps == rhs.throughput_limit_node_out_bps; +} + std::ostream& operator<<(std::ostream& os, const throughput_control_group& tcg) { fmt::print( diff --git a/src/v/config/throughput_control_group.h b/src/v/config/throughput_control_group.h index cc9105931746..09435de2b90a 100644 --- a/src/v/config/throughput_control_group.h +++ b/src/v/config/throughput_control_group.h @@ -41,9 +41,8 @@ struct throughput_control_group { throughput_control_group& operator=(throughput_control_group&&) noexcept; ~throughput_control_group() noexcept; - friend bool - operator==(const throughput_control_group&, const throughput_control_group&) - = default; + friend bool operator==( + const throughput_control_group&, const throughput_control_group&); friend std::ostream& operator<<(std::ostream& os, const throughput_control_group& tcg); From 56dcd2e838907ee17c3c5cb222d4918ae3f23d5b Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 19 Jun 2023 15:46:32 -0400 Subject: [PATCH 06/12] k/quotas: acl_principal matching in tput ctrl groups +UT `throughput_control_group` gets additional matching criteria by ACL principals (users, etc.). In yaml/json, there can now be a list of principals the group must match besides client_id if provided. Only the principal type `user` is supported in this commit. --- .../tests/throughput_control_group_test.cc | 159 +++++++++++++++--- src/v/config/throughput_control_group.cc | 144 +++++++++++++++- src/v/config/throughput_control_group.h | 15 +- src/v/kafka/server/snc_quota_manager.cc | 3 +- src/v/security/fwd.h | 1 + 5 files changed, 295 insertions(+), 27 deletions(-) diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc index ee1787dfb701..a2fe11614fe5 100644 --- a/src/v/config/tests/throughput_control_group_test.cc +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -12,6 +12,7 @@ #include "config/config_store.h" #include "config/property.h" #include "config/throughput_control_group.h" +#include "security/acl.h" #include #include @@ -25,20 +26,44 @@ #include #include #include +#include #include using namespace std::string_literals; -SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { - std::vector tcgv; - tcgv.emplace_back(); +struct test_config : public config::config_store { + config::property> cgroups; + test_config() + : cgroups( + *this, "cgroups", "", {.needs_restart = config::needs_restart::no}) {} - struct test_config : public config::config_store { - config::property> cgroups; - test_config() - : cgroups(*this, "cgroups", "", {.needs_restart = config::needs_restart::no}) {} + auto get_match_index(std::optional client_id) const + -> std::optional { + if (const auto i = config::find_throughput_control_group( + cgroups().cbegin(), cgroups().cend(), client_id, nullptr); + i != cgroups().cend()) { + return std::distance(cgroups().cbegin(), i); + } + return std::nullopt; }; + auto get_match_index( + const security::principal_type type, ss::sstring principal_name) const + -> std::optional { + const security::acl_principal p(type, std::move(principal_name)); + if (const auto i = config::find_throughput_control_group( + cgroups().cbegin(), cgroups().cend(), std::nullopt, &p); + i != cgroups().cend()) { + return std::distance(cgroups().cbegin(), i); + } + return std::nullopt; + } +}; + +SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_clientid_test) { + std::vector tcgv; + tcgv.emplace_back(); + auto cfg_node = YAML::Load(R"( cgroups: - name: "" @@ -82,21 +107,11 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { } // Matches - const auto get_match_index = - [&cfg]( - std::optional client_id) -> std::optional { - if (const auto i = config::find_throughput_control_group( - cfg.cgroups().cbegin(), cfg.cgroups().cend(), client_id); - i != cfg.cgroups().cend()) { - return std::distance(cfg.cgroups().cbegin(), i); - } - return std::nullopt; - }; - BOOST_TEST(get_match_index("client_id-1") == 0); - BOOST_TEST(get_match_index("clinet_id-2") == 1); - BOOST_TEST(get_match_index("") == 3); - BOOST_TEST(get_match_index(std::nullopt) == 4); - BOOST_TEST(get_match_index("nonclient_id") == 5); + BOOST_TEST(cfg.get_match_index("client_id-1") == 0); + BOOST_TEST(cfg.get_match_index("clinet_id-2") == 1); + BOOST_TEST(cfg.get_match_index("") == 3); + BOOST_TEST(cfg.get_match_index(std::nullopt) == 4); + BOOST_TEST(cfg.get_match_index("nonclient_id") == 5); // Copying config::throughput_control_group p4 = cfg.cgroups()[0]; @@ -157,3 +172,103 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) { .find("uplicate") != ss::sstring::npos); } + +SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { + std::vector tcgv; + tcgv.emplace_back(); + + auto cfg_node = YAML::Load(R"( +cgroups: + - name: empty principals list in useless as it matches nothing, but still a valid config + principals: + - name: match a specific user + principals: + - user: alpha + - name: match a list of users (principal 'user') + principals: + - user: beta + - user: gamma + - user: delta +# - name: (future) match a group (principal 'group') +# principals: +# - group: greek_letters +# - name: (future) match schema registry (principal 'ephemeral user') +# principals: +# - service: schema registry +# - name: (future) match PP (principal 'ephemeral user') +# principals: +# - service: panda proxy +# - name: (future) match heterogeneous set of principals +# principals: +# - user: servicebot +# - service: schema registry +# - service: panda proxy + - name: match _any_ authenticated user + principals: + - user: "*" + - name: catch-all - no "principal" matches anything +)"); + cfg_node.SetStyle(YAML::EmitterStyle::Flow); + + test_config cfg; + BOOST_TEST(cfg.read_yaml(cfg_node).empty()); + BOOST_TEST( + YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false})) + == YAML::Dump(cfg_node)); + BOOST_REQUIRE(cfg.cgroups().size() == 5); + for (auto& cg : cfg.cgroups()) { + BOOST_TEST(!cg.throughput_limit_node_in_bps); + BOOST_TEST(!cg.throughput_limit_node_out_bps); + BOOST_TEST(cg.validate().empty()); + } + BOOST_TEST(!validate_throughput_control_groups( + cfg.cgroups().cbegin(), cfg.cgroups().cend())); + + // Equality + for (size_t k = 0; k != cfg.cgroups().size(); ++k) { + for (size_t l = 0; l != cfg.cgroups().size(); ++l) { + BOOST_TEST( + (cfg.cgroups()[k] == cfg.cgroups()[l]) == (k == l), + "k=" << k << " l=" << l); + } + } + + // Matches + using pt = security::principal_type; + BOOST_TEST(cfg.get_match_index(std::nullopt) == 4); + BOOST_TEST(cfg.get_match_index(pt::user, "alpha") == 1); + BOOST_TEST(cfg.get_match_index(pt::user, "beta") == 2); + BOOST_TEST(cfg.get_match_index(pt::user, "gamma") == 2); + BOOST_TEST(cfg.get_match_index(pt::user, "delta") == 2); + BOOST_TEST(cfg.get_match_index(pt::user, "epsilon") == 3); + BOOST_TEST(cfg.get_match_index(pt::user, "zeta") == 3); + + // Copying + config::throughput_control_group p4 = cfg.cgroups()[1]; + BOOST_TEST(p4 == cfg.cgroups()[1]); + BOOST_TEST(fmt::format("{}", p4) == fmt::format("{}", cfg.cgroups()[1])); + + // Binding a property of + auto binding = cfg.cgroups.bind(); + BOOST_TEST(binding() == cfg.cgroups()); + BOOST_TEST( + fmt::format("{}", binding()) == fmt::format("{}", cfg.cgroups())); + + // Failure cases + + // Invalid keys in principal + BOOST_TEST(!cfg + .read_yaml(YAML::Load( + R"(cgroups: [{principals: [{invalid_key: value}]}])"s)) + .empty()); + + // Control characters + BOOST_TEST(!cfg + .read_yaml(YAML::Load( + "cgroups: [{principals: [{\auser: tarantoga}]}]"s)) + .empty()); + BOOST_TEST(!cfg + .read_yaml(YAML::Load( + "cgroups: [{principals: [{user: tarantoga\001}]}]"s)) + .empty()); +} diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc index 37326b01426b..1230f730645a 100644 --- a/src/v/config/throughput_control_group.cc +++ b/src/v/config/throughput_control_group.cc @@ -12,6 +12,7 @@ #include "throughput_control_group.h" #include "config/convert.h" +#include "security/acl.h" #include "ssx/sformat.h" #include "utils/functional.h" #include "utils/to_string.h" @@ -22,6 +23,7 @@ #include #include #include +#include #include #include @@ -29,6 +31,7 @@ #include #include #include +#include #include using namespace std::string_literals; @@ -53,6 +56,8 @@ constexpr const char* name = "name"; constexpr const char* client_id = "client_id"; constexpr const char* tp_limit_node_in = "throughput_limit_node_in_bps"; constexpr const char* tp_limit_node_out = "throughput_limit_node_out_bps"; +constexpr const char* principals = "principals"; +constexpr const char* user = "user"; } // namespace ids constexpr char selector_prefix = '+'; @@ -116,6 +121,7 @@ throughput_control_group::throughput_control_group( other.client_id_matcher ? std::make_unique(*other.client_id_matcher) : std::unique_ptr{}) + , acl_principals(other.acl_principals) , throughput_limit_node_in_bps(other.throughput_limit_node_in_bps) , throughput_limit_node_out_bps(other.throughput_limit_node_out_bps) {} @@ -130,6 +136,7 @@ bool operator==( && (lhs.client_id_matcher == rhs.client_id_matcher || (lhs.client_id_matcher && rhs.client_id_matcher && *lhs.client_id_matcher == *rhs.client_id_matcher)) + && lhs.acl_principals == rhs.acl_principals && lhs.throughput_limit_node_in_bps == rhs.throughput_limit_node_in_bps && lhs.throughput_limit_node_out_bps == rhs.throughput_limit_node_out_bps; } @@ -138,10 +145,11 @@ std::ostream& operator<<(std::ostream& os, const throughput_control_group& tcg) { fmt::print( os, - "{{group_name: {}, client_id_matcher: {}, " + "{{group_name: {}, client_id_matcher: {}, acl_principals: {}, " "throughput_limit_node_in_bps: {}, throughput_limit_node_out_bps: {}}}", tcg.is_noname() ? ""s : fmt::format("{{{}}}", tcg.name), tcg.client_id_matcher, + tcg.acl_principals, tcg.throughput_limit_node_in_bps, tcg.throughput_limit_node_out_bps); return os; @@ -165,6 +173,25 @@ bool throughput_control_group::match_client_id( re2::StringPiece(*client_id_to_match), *client_id_matcher->v); } +bool throughput_control_group::match_acl_principal( + const security::acl_principal* const principal_to_match) const { + if (!acl_principals) { + // omitted match criterion means "always match" + return true; + } + if (!principal_to_match) { + // no other way to match unauthenticated case + return false; + } + return std::any_of( + acl_principals->begin(), + acl_principals->end(), + [principal_to_match](const security::acl_principal& p) { + return (p == *principal_to_match) + || (p.wildcard() && p.type() == principal_to_match->type()); + }); +} + bool throughput_control_group::is_noname() const noexcept { return name == noname; } @@ -194,6 +221,59 @@ ss::sstring throughput_control_group::validate() const { namespace YAML { +template<> +struct convert { + using type = security::acl_principal; + static Node encode(const type& rhs); + static bool decode(const Node& node, type& rhs); +}; + +Node convert::encode(const type& principal) { + using namespace config; + Node node; + + switch (principal.type()) { + case security::principal_type::user: { + node[ids::user] = principal.name(); + break; + } + case security::principal_type::ephemeral_user: { + // not supported yet, invalid config produced if it appears + break; + } + } + + return node; +} + +bool convert::decode( + const Node& node, type& principal) { + using namespace config; + + // always a single element map + if (node.IsNull() || !node.IsMap()) { + return false; + } + if (node.size() != 1) { + return false; + } + const detail::iterator_value& el = *node.begin(); + const auto el_name = el.first.as(); + auto el_value = el.second.as(); + if ( + contains_control_characters(el_name) + || contains_control_characters(el_value)) { + return false; + } + + if (el_name == ids::user) { + principal = security::acl_principal( + security::principal_type::user, std::move(el_value)); + return true; + } + return false; +} + Node convert::encode(const type& tcg) { using namespace config; Node node; @@ -214,6 +294,14 @@ Node convert::encode(const type& tcg) { } } + if (tcg.acl_principals) { + YAML::Node principals_node = node[ids::principals]; + for (const security::acl_principal& p : *tcg.acl_principals) { + principals_node.push_back( + convert::encode(p)); + } + } + return node; } @@ -262,6 +350,26 @@ bool convert::decode( res.client_id_matcher.reset(); } + // principals + if (const auto& n = node[ids::principals]; n) { + std::vector res_principals; + if (!n.IsNull()) { + if (!n.IsSequence()) { + return false; + } + res_principals.reserve(n.size()); + for (const_iterator i = n.begin(); i != n.end(); ++i) { + if (!convert::decode( + *i, res_principals.emplace_back())) { + return false; + } + } + } + res.acl_principals = std::move(res_principals); + } else { + res.acl_principals = std::nullopt; + } + // tp_limit_node_in/out if (const auto& n = node[ids::tp_limit_node_in]; n) { // only the no-limit option is supported yet @@ -284,6 +392,27 @@ bool convert::decode( namespace json { +void rjson_serialize( + json::Writer& w, + const security::acl_principal& principal) { + using namespace config; + w.StartObject(); + + switch (principal.type()) { + case security::principal_type::user: { + w.Key(ids::user); + w.String(principal.name()); + break; + } + case security::principal_type::ephemeral_user: { + // not supported yet, invalid config produced if it appears + break; + } + } + + w.EndObject(); +} + void rjson_serialize( json::Writer& w, const config::throughput_control_group& tcg) { @@ -306,6 +435,19 @@ void rjson_serialize( } } + if (tcg.acl_principals) { + w.Key(ids::principals); + if (tcg.acl_principals->empty()) { + w.Null(); + } else { + w.StartArray(); + for (const security::acl_principal& p : *tcg.acl_principals) { + rjson_serialize(w, p); + } + w.EndArray(tcg.acl_principals->size()); + } + } + w.EndObject(); } diff --git a/src/v/config/throughput_control_group.h b/src/v/config/throughput_control_group.h index 09435de2b90a..ebf122edacea 100644 --- a/src/v/config/throughput_control_group.h +++ b/src/v/config/throughput_control_group.h @@ -16,6 +16,7 @@ #include "json/stringbuffer.h" #include "json/writer.h" #include "seastarx.h" +#include "security/fwd.h" #include #include @@ -26,6 +27,7 @@ #include #include #include +#include namespace re2 { class RE2; @@ -48,11 +50,14 @@ struct throughput_control_group { operator<<(std::ostream& os, const throughput_control_group& tcg); bool match_client_id(std::optional client_id) const; + bool match_acl_principal(const security::acl_principal* principal) const; bool is_noname() const noexcept; ss::sstring validate() const; ss::sstring name; + // nullptr and nullopt mean the filter is omitted (matches any) std::unique_ptr client_id_matcher; + std::optional> acl_principals; // nuillopt means unlimited: std::optional throughput_limit_node_in_bps; std::optional throughput_limit_node_out_bps; @@ -68,10 +73,14 @@ template InputIt find_throughput_control_group( const InputIt first, const InputIt last, - const std::optional client_id) { + const std::optional client_id, + const security::acl_principal* const principal) { return std::find_if( - first, last, [&client_id](const config::throughput_control_group& cg) { - return cg.match_client_id(client_id); + first, + last, + [&client_id, principal](const config::throughput_control_group& cg) { + return cg.match_client_id(client_id) + && cg.match_acl_principal(principal); }); } diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 35222fea061f..26782fdcaf76 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -292,7 +292,8 @@ void snc_quota_manager::get_or_create_quota_context( const auto tcgroup_it = config::find_throughput_control_group( _kafka_throughput_control().cbegin(), _kafka_throughput_control().cend(), - client_id); + client_id, + nullptr); if (tcgroup_it == _kafka_throughput_control().cend()) { ctx->_exempt = false; vlog( diff --git a/src/v/security/fwd.h b/src/v/security/fwd.h index 04c195ef2cf2..969b383730f8 100644 --- a/src/v/security/fwd.h +++ b/src/v/security/fwd.h @@ -13,6 +13,7 @@ namespace security { +class acl_principal; class authorizer; class credential_store; class ephemeral_credential_store; From 7b81839266c884c33d5503a3298b69a23fe9ff8b Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Tue, 20 Jun 2023 01:05:56 -0400 Subject: [PATCH 07/12] k/quotas: configuration test for principal matching Add user names to the matching pattern of throughput_control groups --- tests/rptest/tests/throughput_limits_snc_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/rptest/tests/throughput_limits_snc_test.py b/tests/rptest/tests/throughput_limits_snc_test.py index 972fe35d55fc..781edf46bcad 100644 --- a/tests/rptest/tests/throughput_limits_snc_test.py +++ b/tests/rptest/tests/throughput_limits_snc_test.py @@ -166,6 +166,16 @@ def get_config_parameter_random_value(self, prop: ConfigProp): elif r == 2: tc_item['client_id'] = 'client_\d+' + principals = [] + for _ in range(self.rnd.randrange(3)): + r = self.rnd.randrange(5) + if r == 0: + principals.append({'user': 'admin'}) + elif r == 1: + principals.append({'user': '*'}) + if len(principals) != 0: + tc_item['principals'] = principals + throughput_control.append(tc_item) return throughput_control From cc62a5e722b8d0a6c565b90d0be9e46ddc92952c Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 14:35:20 -0400 Subject: [PATCH 08/12] k/quotas: acl_principals in snc_quota_context acl_principal is now another key to snc_quotas_context. It is passed from the connection_context to create the context by matching with a tput ctrl group, and to verify whether the context is still valid. --- src/v/kafka/server/connection_context.cc | 6 ++++- src/v/kafka/server/snc_quota_manager.cc | 31 ++++++++++++++++++++---- src/v/kafka/server/snc_quota_manager.h | 12 +++++++-- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index d3d064f88f57..fd158ccc3832 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -240,7 +240,11 @@ connection_context::record_tp_and_calculate_throttle( snc_quota_manager::delays_t shard_delays; if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { _server.snc_quota_mgr().get_or_create_quota_context( - _snc_quota_context, hdr.client_id, _client_addr, client_port()); + _snc_quota_context, + hdr.client_id, + _sasl ? &_sasl->principal() : nullptr, + _client_addr, + client_port()); _server.snc_quota_mgr().record_request_receive( *_snc_quota_context, request_size, now); shard_delays = _server.snc_quota_mgr().get_shard_delays( diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 26782fdcaf76..35a4683cb75f 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -253,13 +253,22 @@ snc_quota_manager::calc_node_quota_default() const { void snc_quota_manager::get_or_create_quota_context( std::unique_ptr& ctx, std::optional client_id, + const security::acl_principal* const principal, const ss::net::inet_address& client_addr, const uint16_t client_port) { if (likely(ctx)) { + bool principal_match = false; + if (principal) { + principal_match = ctx->_acl_principal + && *ctx->_acl_principal == *principal; + } else { + principal_match = !ctx->_acl_principal; + } + // note: comparing sstring (the lefthand _client_id) to string_view // (the righthand client_id) is only possible by converting the former // to string_view, or with the sstring::operator<=>, => no perf penalty - if (likely(ctx->_client_id == client_id)) { + if (likely(principal_match && ctx->_client_id == client_id)) { // the context is the right one return; } @@ -270,6 +279,16 @@ void snc_quota_manager::get_or_create_quota_context( // this should not happen. If it does happen with a supported client, we // probably should start supporting multiple quota contexts per // connection + if (!principal_match) { + vlog( + klog.warn, + "{}:{} - qm - acl_principal has changed on the connection. " + "Quotas are reset now. Old: {}, new: {}", + client_addr, + client_port, + ctx->_acl_principal, + principal ? std::make_optional(*principal) : std::nullopt); + } if (ctx->_client_id != client_id) { vlog( klog.warn, @@ -282,18 +301,20 @@ void snc_quota_manager::get_or_create_quota_context( } } - ctx = std::make_unique(client_id); + ctx = std::make_unique( + client_id, principal ? std::make_optional(*principal) : std::nullopt); vlog( klog.trace, - "{}:{} - qm - Matching client_id: {}", + "{}:{} - qm - Matching client_id: {}, principal: {}", client_addr, client_port, - ctx->_client_id); + ctx->_client_id, + ctx->_acl_principal); const auto tcgroup_it = config::find_throughput_control_group( _kafka_throughput_control().cbegin(), _kafka_throughput_control().cend(), client_id, - nullptr); + principal); if (tcgroup_it == _kafka_throughput_control().cend()) { ctx->_exempt = false; vlog( diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index 37ae468b13e8..303cb5a35baa 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -13,6 +13,7 @@ #include "config/property.h" #include "config/throughput_control_group.h" #include "seastarx.h" +#include "security/acl.h" #include "utils/bottomless_token_bucket.h" #include "utils/mutex.h" @@ -26,6 +27,8 @@ #include #include #include +#include +#include namespace kafka { @@ -63,14 +66,18 @@ class snc_quotas_probe { class snc_quota_context { public: - explicit snc_quota_context(std::optional client_id) - : _client_id(client_id) {} + snc_quota_context( + std::optional client_id, + std::optional acl_principal) + : _client_id(client_id) + , _acl_principal(std::move(acl_principal)) {} private: friend class snc_quota_manager; // Indexing std::optional _client_id; + std::optional _acl_principal; // Configuration @@ -119,6 +126,7 @@ class snc_quota_manager void get_or_create_quota_context( std::unique_ptr& ctx, std::optional client_id, + const security::acl_principal* principal, const ss::net::inet_address& client_addr, uint16_t client_port); From b0c0cc79c53945f50607e1e71e649513705bed08 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 16:33:15 -0400 Subject: [PATCH 09/12] k/quotas: test_throughput_groups_exemptions A new test to verify that when a user matches a tput ctrl group, the connections authenticated by the user are not throttled while the rest still are. --- .../tests/throughput_limits_snc_test.py | 92 ++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/throughput_limits_snc_test.py b/tests/rptest/tests/throughput_limits_snc_test.py index 781edf46bcad..c764a9f61776 100644 --- a/tests/rptest/tests/throughput_limits_snc_test.py +++ b/tests/rptest/tests/throughput_limits_snc_test.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0 import random, time, math, json, string +import socket from enum import Enum from typing import Tuple @@ -16,11 +17,13 @@ from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster -from rptest.services.redpanda import MetricsEndpoint +from rptest.services.redpanda import MetricsEndpoint, SaslCredentials, SecurityConfig from rptest.services.rpk_producer import RpkProducer from rptest.tests.redpanda_test import RedpandaTest from rptest.services.kcat_consumer import KcatConsumer from rptest.clients.kafka_cat import KafkaCat +from rptest.services import tls +from rptest.services.admin import Admin # This file is about throughput limiting that works at shard/node/cluster (SNC) # levels, like cluster-wide and node-wide throughput limits @@ -39,6 +42,7 @@ def __init__(self, test_ctx: TestContext, *args, **kwargs): num_brokers=3, *args, **kwargs) + self.superuser: SaslCredentials = self.redpanda.SUPERUSER_CREDENTIALS rnd_seed_override = test_ctx.globals.get("random_seed") if rnd_seed_override is None: # default seed value is composed from @@ -340,3 +344,89 @@ def on_message(consumer: KcatConsumer, message: dict): producer.stop() consumer.stop() consumer2.stop() + + @cluster(num_nodes=5) + def test_throughput_groups_exemptions(self): + """ + Clients with configured tput exemptions are not limited. + This includes configuration by + - client_id (TBD) + - auth user + """ + + security = SecurityConfig() + security.enable_sasl = True + tp_limit = 16 * kiB + self.redpanda.set_security_settings(security) + self.redpanda.set_extra_rp_conf({ + self.ConfigProp.THROUGHPUT_CONTROL.value: [{ + 'principals': [{ + 'user': self.superuser.username + }] + }], + self.ConfigProp.QUOTA_NODE_MAX_IN.value: + tp_limit, + self.ConfigProp.QUOTA_NODE_MAX_EG.value: + tp_limit, + }) + self.topics = [TopicSpec(partition_count=1)] + super(ThroughputLimitsSnc, self).setUp() + + admin = Admin(self.redpanda) + user = SaslCredentials("user2", "password2", "SCRAM-SHA-256") + admin.create_user(user.username, user.password, user.algorithm) + + rpk = RpkTool(self.redpanda, + username=self.superuser.username, + password=self.superuser.password, + sasl_mechanism=self.superuser.algorithm) + rpk.acl_create_allow_cluster(user.username, 'ALL') + rpk.acl_create_allow_topic(user.username, self.topic, 'ALL') + + # run 2 producers, for admin and for user + # measure tput, check that user tput is under the threshold and + # admin tput is over the threshold + msg_size = 8 * kiB + msg_count = 30 + producer0 = RpkProducer(self.test_context, + self.redpanda, + self.topic, + msg_size=msg_size, + msg_count=msg_count, + max_message_bytes=3 * msg_size, + printable=True, + user=self.superuser.username, + password=self.superuser.password) + producer1 = RpkProducer(self.test_context, + self.redpanda, + self.topic, + msg_size=msg_size, + msg_count=msg_count, + max_message_bytes=3 * msg_size, + printable=True, + user=user.username, + password=user.password) + producer0.start() + producer1.start() + start = time.time() + + producer0.wait() + time0 = time.time() - start + assert producer1.is_running( + ), "producer1 is throttled and must have finished last" + + producer1.wait() + time1 = time.time() - start + + producer0.stop() + producer1.stop() + producer0.free() + producer1.free() + + rate0 = msg_count * msg_size / time0 + rate1 = msg_count * msg_size / time1 + self.logger.info(f"Non-throttled producer: {time0} s, {rate0} B/s") + self.logger.info( + f"Producer throttled at {tp_limit} B/s: {time1} s, {rate1} B/s") + assert rate0 / tp_limit > 5, "Exempt clients's rate must be much larger than the limit" + assert rate1 / tp_limit < 1, "Non-exempt clients's rate must be lower than the limit" From 2bbfd9972eca8c208c29445804f761b1bd10fef5 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Wed, 21 Jun 2023 16:39:12 -0400 Subject: [PATCH 10/12] tests/redpanda: log the bootstrap cluster config to complement the logging of node configurations. Note that cluster_config.yaml records cluster config when nodes stop, whlie this log dumps the initial bootstrap config. --- tests/rptest/services/redpanda.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index d9211c8e17c5..18e51f9beda6 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -3055,6 +3055,7 @@ def write_bootstrap_cluster_config(self): conf.update(dict(sasl_mechanisms=self._security.sasl_mechanisms)) conf_yaml = yaml.dump(conf) + self.logger.debug(conf_yaml) for node in self.nodes: self.logger.info( "Writing bootstrap cluster config file {}:{}".format( From 4a3a4de3aa8fdadeba7c933d35132f6a2c3a0395 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 19 Jun 2023 16:09:07 -0400 Subject: [PATCH 11/12] k/quotas: prototype support for ephemeral users for SR and PP in tput group configuration, one can specify "service" instead of a "user", and specify SR or PP as the service, which internally transaltes into the corresponding acl principal --- .../tests/throughput_control_group_test.cc | 44 ++++++++++++------- src/v/config/throughput_control_group.cc | 42 +++++++++++++++++- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/src/v/config/tests/throughput_control_group_test.cc b/src/v/config/tests/throughput_control_group_test.cc index a2fe11614fe5..e7ce6ac50a18 100644 --- a/src/v/config/tests/throughput_control_group_test.cc +++ b/src/v/config/tests/throughput_control_group_test.cc @@ -192,17 +192,17 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { # - name: (future) match a group (principal 'group') # principals: # - group: greek_letters -# - name: (future) match schema registry (principal 'ephemeral user') -# principals: -# - service: schema registry -# - name: (future) match PP (principal 'ephemeral user') -# principals: -# - service: panda proxy -# - name: (future) match heterogeneous set of principals -# principals: -# - user: servicebot -# - service: schema registry -# - service: panda proxy + - name: match schema registry (principal 'ephemeral user') + principals: + - service: schema registry + - name: match PP (principal 'ephemeral user') + principals: + - service: panda proxy + - name: match heterogeneous set of principals + principals: + - user: servicebot + - service: schema registry + - service: panda proxy - name: match _any_ authenticated user principals: - user: "*" @@ -215,7 +215,7 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { BOOST_TEST( YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false})) == YAML::Dump(cfg_node)); - BOOST_REQUIRE(cfg.cgroups().size() == 5); + BOOST_REQUIRE(cfg.cgroups().size() == 8); for (auto& cg : cfg.cgroups()) { BOOST_TEST(!cg.throughput_limit_node_in_bps); BOOST_TEST(!cg.throughput_limit_node_out_bps); @@ -235,13 +235,17 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { // Matches using pt = security::principal_type; - BOOST_TEST(cfg.get_match_index(std::nullopt) == 4); + BOOST_TEST(cfg.get_match_index(std::nullopt) == 7); BOOST_TEST(cfg.get_match_index(pt::user, "alpha") == 1); BOOST_TEST(cfg.get_match_index(pt::user, "beta") == 2); BOOST_TEST(cfg.get_match_index(pt::user, "gamma") == 2); BOOST_TEST(cfg.get_match_index(pt::user, "delta") == 2); - BOOST_TEST(cfg.get_match_index(pt::user, "epsilon") == 3); - BOOST_TEST(cfg.get_match_index(pt::user, "zeta") == 3); + BOOST_TEST(cfg.get_match_index(pt::user, "epsilon") == 6); + BOOST_TEST(cfg.get_match_index(pt::user, "zeta") == 6); + BOOST_TEST( + cfg.get_match_index(pt::ephemeral_user, "__schema_registry") == 3); + BOOST_TEST(cfg.get_match_index(pt::ephemeral_user, "__pandaproxy") == 4); + BOOST_TEST(cfg.get_match_index(pt::user, "servicebot") == 5); // Copying config::throughput_control_group p4 = cfg.cgroups()[1]; @@ -262,6 +266,12 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { R"(cgroups: [{principals: [{invalid_key: value}]}])"s)) .empty()); + // Invalid service name + BOOST_TEST(!cfg + .read_yaml(YAML::Load( + R"(cgroups: [{principals: [{service: sepulkarium}]}])"s)) + .empty()); + // Control characters BOOST_TEST(!cfg .read_yaml(YAML::Load( @@ -271,4 +281,8 @@ SEASTAR_THREAD_TEST_CASE(throughput_control_group_by_principal_test) { .read_yaml(YAML::Load( "cgroups: [{principals: [{user: tarantoga\001}]}]"s)) .empty()); + BOOST_TEST(!cfg + .read_yaml(YAML::Load( + "cgroups: [{principals: [{service: panda proxy\002}]}]"s)) + .empty()); } diff --git a/src/v/config/throughput_control_group.cc b/src/v/config/throughput_control_group.cc index 1230f730645a..20c991a886f3 100644 --- a/src/v/config/throughput_control_group.cc +++ b/src/v/config/throughput_control_group.cc @@ -58,6 +58,9 @@ constexpr const char* tp_limit_node_in = "throughput_limit_node_in_bps"; constexpr const char* tp_limit_node_out = "throughput_limit_node_out_bps"; constexpr const char* principals = "principals"; constexpr const char* user = "user"; +constexpr const char* service = "service"; +constexpr const char* panda_proxy = "panda proxy"; +constexpr const char* schema_registry = "schema registry"; } // namespace ids constexpr char selector_prefix = '+'; @@ -238,7 +241,16 @@ Node convert::encode(const type& principal) { break; } case security::principal_type::ephemeral_user: { - // not supported yet, invalid config produced if it appears + // TBD: how to define the PP/SR ephemeral principal in one place + // and avoid dependency of `config` on `pandaproxy`? + YAML::Node service_node = node[ids::service]; + if (principal.name() == "__pandaproxy") { + service_node = ids::panda_proxy; + } else if (principal.name() == "__schema_registry") { + service_node = ids::schema_registry; + } + // else an invalid config produced: + // - service: break; } } @@ -271,6 +283,23 @@ bool convert::decode( security::principal_type::user, std::move(el_value)); return true; } + + if (el_name == ids::service) { + if (el_value == ids::panda_proxy) { + // TBD: how to define the PP ephemeral principal in one place + // and avoid dependency of `config` on `pandaproxy`? + principal = security::acl_principal{ + security::principal_type::ephemeral_user, "__pandaproxy"}; + return true; + } + if (el_value == ids::schema_registry) { + // TBD: how to define the SR ephemeral principal in one place + // and avoid dependency of `config` on `pandaproxy`? + principal = security::acl_principal{ + security::principal_type::ephemeral_user, "__schema_registry"}; + return true; + } + } return false; } @@ -405,7 +434,16 @@ void rjson_serialize( break; } case security::principal_type::ephemeral_user: { - // not supported yet, invalid config produced if it appears + // TBD: how to define the PP/SR ephemeral principal in one place + // and avoid dependency of `config` on `pandaproxy`? + w.Key(ids::service); + if (principal.name() == "__pandaproxy") { + w.String(ids::panda_proxy); + } else if (principal.name() == "__schema_registry") { + w.String(ids::schema_registry); + } + // else an invalid config produced: + // - service: break; } } From a5622402a3fcc18ad1b96c9087455f76dda42d1f Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Tue, 20 Jun 2023 01:07:06 -0400 Subject: [PATCH 12/12] k/quotas: add service matching patterns for tput control groups --- tests/rptest/tests/throughput_limits_snc_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/rptest/tests/throughput_limits_snc_test.py b/tests/rptest/tests/throughput_limits_snc_test.py index c764a9f61776..36c07a54648b 100644 --- a/tests/rptest/tests/throughput_limits_snc_test.py +++ b/tests/rptest/tests/throughput_limits_snc_test.py @@ -177,6 +177,10 @@ def get_config_parameter_random_value(self, prop: ConfigProp): principals.append({'user': 'admin'}) elif r == 1: principals.append({'user': '*'}) + elif r == 2: + principals.append({'service': 'schema registry'}) + elif r == 3: + principals.append({'service': 'panda proxy'}) if len(principals) != 0: tc_item['principals'] = principals