Skip to content

Commit

Permalink
Merge pull request #10755 from dlex/10575_tp-exemptions-for-clients
Browse files Browse the repository at this point in the history
Node-wide throughput exemptions for clients
  • Loading branch information
dotnwat authored Jun 3, 2023
2 parents 68d67a5 + 529cbf3 commit 750bc90
Show file tree
Hide file tree
Showing 17 changed files with 778 additions and 52 deletions.
7 changes: 2 additions & 5 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,7 @@ void auth_refresh_bg_op::do_start_auth_refresh_op(
return cloud_roles::aws_region_name{};
} else {
static_assert(
cloud_storage_clients::always_false_v<cfg_type>,
"Unknown client type");
always_false_v<cfg_type>, "Unknown client type");
return cloud_roles::aws_region_name{};
}
},
Expand Down Expand Up @@ -1304,9 +1303,7 @@ cloud_roles::credentials auth_refresh_bg_op::build_static_credentials() const {
return cloud_roles::abs_credentials{
cfg.storage_account_name, cfg.shared_key.value()};
} else {
static_assert(
cloud_storage_clients::always_false_v<cfg_type>,
"Unknown client type");
static_assert(always_false_v<cfg_type>, "Unknown client type");
return cloud_roles::aws_credentials{};
}
},
Expand Down
3 changes: 0 additions & 3 deletions src/v/cloud_storage_clients/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ using client_configuration_variant = std::variant<Ts...>;
using client_configuration
= client_configuration_variant<abs_configuration, s3_configuration>;

template<typename>
inline constexpr bool always_false_v = false;

model::cloud_storage_backend infer_backend_from_configuration(
const client_configuration& client_config,
model::cloud_credentials_source cloud_storage_credentials_source);
Expand Down
1 change: 1 addition & 0 deletions src/v/config/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ v_cc_library(
base_property.cc
rjson_serialization.cc
validators.cc
throughput_control_group.cc
DEPS
v::json
v::model
Expand Down
20 changes: 19 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2038,9 +2038,27 @@ configuration::configuration()
*this,
"kafka_throughput_controlled_api_keys",
"List of Kafka API keys that are subject to cluster-wide "
"and node-wide thoughput limit control",
"and node-wide throughput limit control",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
{"produce", "fetch"})
, kafka_throughput_control(
*this,
"kafka_throughput_control",
"List of throughput control groups that define exclusions from node-wide "
"throughput limits. Each group consists of: (\"name\" (optional) - any "
"unique group name, \"client_id\" - regex to match client_id). "
"A connection is assigned the first matching group, then the connection "
"is excluded from throughput control.",
{
.needs_restart = needs_restart::no,
.example
= R"([{'name': 'first_group','client_id': 'client1'}, {'client_id': 'consumer-\d+'}, {'name': 'catch all'}])",
.visibility = visibility::user,
},
{},
[](auto& v) {
return validate_throughput_control_groups(v.cbegin(), v.cend());
})
, node_isolation_heartbeat_timeout(
*this,
"node_isolation_heartbeat_timeout",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "config/data_directory_path.h"
#include "config/endpoint_tls_config.h"
#include "config/property.h"
#include "config/throughput_control_group.h"
#include "config/tls_config.h"
#include "model/compression.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -409,6 +410,7 @@ struct configuration final : public config_store {
property<double> kafka_quota_balancer_min_shard_throughput_ratio;
bounded_property<int64_t> kafka_quota_balancer_min_shard_throughput_bps;
property<std::vector<ss::sstring>> kafka_throughput_controlled_api_keys;
property<std::vector<throughput_control_group>> kafka_throughput_control;

bounded_property<int64_t> node_isolation_heartbeat_timeout;

Expand Down
3 changes: 2 additions & 1 deletion src/v/config/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ set(srcs
advertised_kafka_api_test.cc
seed_server_property_test.cc
cloud_credentials_source_test.cc
validator_tests.cc)
validator_tests.cc
throughput_control_group_test.cc)

rp_test(
UNIT_TEST
Expand Down
139 changes: 139 additions & 0 deletions src/v/config/tests/throughput_control_group_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "config/config_store.h"
#include "config/property.h"
#include "config/throughput_control_group.h"

#include <seastar/core/sstring.hh>
#include <seastar/testing/thread_test_case.hh>

#include <boost/test/tools/interface.hpp>
#include <boost/test/tools/old/interface.hpp>
#include <yaml-cpp/emitterstyle.h>
#include <yaml-cpp/exceptions.h>

#include <algorithm>
#include <exception>
#include <iterator>
#include <locale>
#include <vector>

using namespace std::string_literals;

SEASTAR_THREAD_TEST_CASE(throughput_control_group_test) {
std::vector<config::throughput_control_group> tcgv;
tcgv.emplace_back();

struct test_config : public config::config_store {
config::property<std::vector<config::throughput_control_group>> cgroups;
test_config()
: cgroups(*this, "cgroups", "") {}
};

auto cfg_node = YAML::Load(R"(
cgroups:
- name: ""
client_id: client_id-1
- client_id: cli.+_id-\d+
- client_id: another unnamed group indended to verify this passes validation
- name: match-nothing-group
client_id: ""
- name: unspecified client_id
client_id: +empty
- name: cgroup-catchall because no clientid group given
)");
cfg_node.SetStyle(YAML::EmitterStyle::Flow);

test_config cfg;
BOOST_TEST(cfg.read_yaml(cfg_node).empty());
BOOST_TEST(
YAML::Dump(config::to_yaml(cfg, config::redact_secrets{false}))
== YAML::Dump(cfg_node));
BOOST_TEST(cfg.cgroups().size() == 6);
for (auto& cg : cfg.cgroups()) {
BOOST_TEST(!cg.throughput_limit_node_in_bps);
BOOST_TEST(!cg.throughput_limit_node_out_bps);
BOOST_TEST(cg.validate().empty());
}
BOOST_TEST(cfg.cgroups()[0].name == "");
BOOST_TEST(!cfg.cgroups()[0].is_noname());
BOOST_TEST(cfg.cgroups()[1].name != "");
BOOST_TEST(cfg.cgroups()[1].is_noname());
BOOST_TEST(cfg.cgroups()[3].name == "match-nothing-group");
BOOST_TEST(!validate_throughput_control_groups(
cfg.cgroups().cbegin(), cfg.cgroups().cend()));

// Matches
const auto get_match_index =
[&cfg](
std::optional<std::string_view> client_id) -> std::optional<size_t> {
if (const auto i = config::find_throughput_control_group(
cfg.cgroups().cbegin(), cfg.cgroups().cend(), client_id);
i != cfg.cgroups().cend()) {
return std::distance(cfg.cgroups().cbegin(), i);
}
return std::nullopt;
};
BOOST_TEST(get_match_index("client_id-1") == 0);
BOOST_TEST(get_match_index("clinet_id-2") == 1);
BOOST_TEST(get_match_index("") == 3);
BOOST_TEST(get_match_index(std::nullopt) == 4);
BOOST_TEST(get_match_index("nonclient_id") == 5);

// Failure cases

// Control characters in names. In YAML, control characters [are not
// allowed](https://yaml.org/spec/1.2.2/#51-character-set) in the stream and
// should be [escaped](https://yaml.org/spec/1.2.2/#57-escaped-characters).
// Yaml-cpp does not recognize escape sequences. It fails with
// YAML::ParseException on some control characters (like \0), but lets
// others through (like \b). throughput_control_group parser should not
// let any through though.
BOOST_CHECK_THROW(
cfg.read_yaml(YAML::Load("cgroups: [{name: n\0, client_id: c1}]"s)),
YAML::Exception);
BOOST_TEST(
!cfg.read_yaml(YAML::Load("cgroups: [{name: n1, client_id: c-1-\b-2}]"s))
.empty());

// Invalid regex syntax
BOOST_TEST(
!cfg.read_yaml(YAML::Load(R"(cgroups: [{name: n, client_id: "[A-Z}"}])"s))
.empty());
BOOST_TEST(
!cfg.read_yaml(YAML::Load(R"(cgroups: [{name: n, client_id: "*"}])"s))
.empty());

// Specify any throupghput limit
BOOST_TEST(
!cfg
.read_yaml(YAML::Load(
R"(cgroups: [{name: n, client_id: c, throughput_limit_node_in_bps: 0}])"s))
.empty());
BOOST_TEST(
!cfg
.read_yaml(YAML::Load(
R"(cgroups: [{name: n, client_id: c, throughput_limit_node_out_bps: 100}])"s))
.empty());

// Duplicate group names other than unnamed
BOOST_TEST(cfg
.read_yaml(YAML::Load(
R"(cgroups: [{name: developers}, {name: developers}])"s))
.empty());
BOOST_TEST(
validate_throughput_control_groups(
cfg.cgroups().cbegin(), cfg.cgroups().cend())
.value_or("")
.find("uplicate")
!= ss::sstring::npos);
}
Loading

0 comments on commit 750bc90

Please sign in to comment.