Skip to content

Commit

Permalink
Merge pull request #11775 from vbotbuildovich/backport-pr-11496-v22.3…
Browse files Browse the repository at this point in the history
….x-689

[v22.3.x] net: Explicitly set and reduce TCP keepalive on the kafka API
  • Loading branch information
StephanDollberg authored Jul 7, 2023
2 parents 3cf4956 + 4f577f3 commit cceefd9
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,33 @@ configuration::configuration()
"refreshed",
{.visibility = visibility::tunable},
2s)
, kafka_tcp_keepalive_idle_timeout_seconds(
*this,
"kafka_tcp_keepalive_timeout",
"TCP keepalive idle timeout in seconds for kafka connections. This "
"describes the timeout between tcp keepalive probes that the remote site"
"successfully acknowledged. Refers to the TCP_KEEPIDLE socket option. "
"When changed applies to new connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
120s)
, kafka_tcp_keepalive_probe_interval_seconds(
*this,
"kafka_tcp_keepalive_probe_interval_seconds",
"TCP keepalive probe interval in seconds for kafka connections. This "
"describes the timeout between unacknowledged tcp keepalives. Refers to "
"the TCP_KEEPINTVL socket option. When changed applies to new "
"connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
60s)
, kafka_tcp_keepalive_probes(
*this,
"kafka_tcp_keepalive_probes",
"TCP keepalive unacknowledge probes until the connection is considered "
"dead for kafka connections. Refers to the TCP_KEEPCNT socket option. "
"When "
"changed applies to new connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
3)
, kafka_connection_rate_limit(
*this,
"kafka_connection_rate_limit",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct configuration final : public config_store {
enum_property<model::compression> log_compression_type;
property<size_t> fetch_max_bytes;
property<std::chrono::milliseconds> metadata_status_wait_timeout_ms;
property<std::chrono::seconds> kafka_tcp_keepalive_idle_timeout_seconds;
property<std::chrono::seconds> kafka_tcp_keepalive_probe_interval_seconds;
property<uint32_t> kafka_tcp_keepalive_probes;
bounded_property<std::optional<int64_t>> kafka_connection_rate_limit;
property<std::vector<ss::sstring>> kafka_connection_rate_limit_overrides;
// same as transactional.id.expiration.ms in kafka
Expand Down
9 changes: 9 additions & 0 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
ar.connection.set_nodelay(true);
ar.connection.set_keepalive(true);

if (cfg.tcp_keepalive_bindings.has_value()) {
ar.connection.set_keepalive_parameters(
seastar::net::tcp_keepalive_params{
.idle = cfg.tcp_keepalive_bindings->keepalive_idle_time(),
.interval = cfg.tcp_keepalive_bindings->keepalive_interval(),
.count = cfg.tcp_keepalive_bindings->keepalive_probes(),
});
}

conn_quota::units cq_units;
if (cfg.conn_quotas) {
cq_units = co_await cfg.conn_quotas->get().local().get(
Expand Down
7 changes: 7 additions & 0 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ struct config_connection_rate_bindings {
config::binding<std::vector<ss::sstring>> config_overrides_rate;
};

struct tcp_keepalive_bindings {
config::binding<std::chrono::seconds> keepalive_idle_time;
config::binding<std::chrono::seconds> keepalive_interval;
config::binding<uint32_t> keepalive_probes;
};

struct server_configuration {
std::vector<server_endpoint> addrs;
int64_t max_service_memory_per_core;
Expand All @@ -83,6 +89,7 @@ struct server_configuration {
= net::public_metrics_disabled::no;
ss::sstring name;
std::optional<config_connection_rate_bindings> connection_rate_bindings;
std::optional<tcp_keepalive_bindings> tcp_keepalive_bindings;
// we use the same default as seastar for load balancing algorithm
ss::server_socket::load_balancing_algorithm load_balancing_algo
= ss::server_socket::load_balancing_algorithm::connection_distribution;
Expand Down
11 changes: 11 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,17 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
};

c.connection_rate_bindings.emplace(std::move(bindings));

c.tcp_keepalive_bindings.emplace(net::tcp_keepalive_bindings{
.keepalive_idle_time
= config::shard_local_cfg()
.kafka_tcp_keepalive_idle_timeout_seconds.bind(),
.keepalive_interval
= config::shard_local_cfg()
.kafka_tcp_keepalive_probe_interval_seconds.bind(),
.keepalive_probes
= config::shard_local_cfg().kafka_tcp_keepalive_probes.bind(),
});
});
})
.get();
Expand Down
80 changes: 80 additions & 0 deletions tests/rptest/tests/tcp_keepalive_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2023 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.utils.util import wait_until
from rptest.services.cluster import cluster
from rptest.tests.redpanda_test import RedpandaTest

import subprocess
import random
import string
import sys

BOOTSTRAP_CONFIG = {
'kafka_tcp_keepalive_timeout': 1,
'kafka_tcp_keepalive_probe_interval_seconds': 1,
'kafka_tcp_keepalive_probes': 3
}


class TcpKeepaliveTest(RedpandaTest):
def __init__(self, *args, **kwargs):
rp_conf = BOOTSTRAP_CONFIG.copy()

super(TcpKeepaliveTest, self).__init__(*args,
extra_rp_conf=rp_conf,
**kwargs)

@cluster(num_nodes=1)
def test_tcp_keepalive(self):
"""
Test that TCP keepalive causes clients to disconnect
"""

try:
# create a random group
group_name = ''.join(random.choices(string.ascii_letters, k=20))
subprocess.check_call(['sudo', 'groupadd', group_name])

# spawn netcat running in that group
with subprocess.Popen([
'sudo', 'sg', group_name, "nc -v {} 9092".format(
self.redpanda.nodes[0].account.hostname)
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) as nc_proc:
# wait for us to connect
for line in nc_proc.stdout:
line = line.decode('utf-8').lower()
print(line, file=sys.stderr)
if 'succeeded' in line:
break

# add iptables rule for that group to drop all packets
subprocess.check_call([
'sudo', 'iptables', '-A', 'OUTPUT', '-m', 'owner',
'--gid-owner', group_name, '-j', 'DROP'
])

# tcp keepalive should now time out and RP should RST the connection making netcat stop
nc_proc.wait(timeout=10)

# confirm RP also saw the client gone
wait_until(lambda: self.redpanda.search_log_node(
self.redpanda.nodes[0], 'Disconnected'),
timeout_sec=10,
err_msg="Failed to find disconnect message in log")

finally:
# if these fail nothing we can do so don't check the return code
subprocess.call([
'sudo', 'iptables', '-D', 'OUTPUT', '-m', 'owner',
'--gid-owner', group_name, '-j', 'DROP'
])
subprocess.call(['sudo', 'groupdel', group_name])

0 comments on commit cceefd9

Please sign in to comment.