Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] net: Explicitly set and reduce TCP keepalive on the kafka API #11775

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,33 @@ configuration::configuration()
"refreshed",
{.visibility = visibility::tunable},
2s)
, kafka_tcp_keepalive_idle_timeout_seconds(
*this,
"kafka_tcp_keepalive_timeout",
"TCP keepalive idle timeout in seconds for kafka connections. This "
"describes the timeout between tcp keepalive probes that the remote site"
"successfully acknowledged. Refers to the TCP_KEEPIDLE socket option. "
"When changed applies to new connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
120s)
, kafka_tcp_keepalive_probe_interval_seconds(
*this,
"kafka_tcp_keepalive_probe_interval_seconds",
"TCP keepalive probe interval in seconds for kafka connections. This "
"describes the timeout between unacknowledged tcp keepalives. Refers to "
"the TCP_KEEPINTVL socket option. When changed applies to new "
"connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
60s)
, kafka_tcp_keepalive_probes(
*this,
"kafka_tcp_keepalive_probes",
"TCP keepalive unacknowledge probes until the connection is considered "
"dead for kafka connections. Refers to the TCP_KEEPCNT socket option. "
"When "
"changed applies to new connections only.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
3)
, kafka_connection_rate_limit(
*this,
"kafka_connection_rate_limit",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct configuration final : public config_store {
enum_property<model::compression> log_compression_type;
property<size_t> fetch_max_bytes;
property<std::chrono::milliseconds> metadata_status_wait_timeout_ms;
property<std::chrono::seconds> kafka_tcp_keepalive_idle_timeout_seconds;
property<std::chrono::seconds> kafka_tcp_keepalive_probe_interval_seconds;
property<uint32_t> kafka_tcp_keepalive_probes;
bounded_property<std::optional<int64_t>> kafka_connection_rate_limit;
property<std::vector<ss::sstring>> kafka_connection_rate_limit_overrides;
// same as transactional.id.expiration.ms in kafka
Expand Down
9 changes: 9 additions & 0 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
ar.connection.set_nodelay(true);
ar.connection.set_keepalive(true);

if (cfg.tcp_keepalive_bindings.has_value()) {
ar.connection.set_keepalive_parameters(
seastar::net::tcp_keepalive_params{
.idle = cfg.tcp_keepalive_bindings->keepalive_idle_time(),
.interval = cfg.tcp_keepalive_bindings->keepalive_interval(),
.count = cfg.tcp_keepalive_bindings->keepalive_probes(),
});
}

conn_quota::units cq_units;
if (cfg.conn_quotas) {
cq_units = co_await cfg.conn_quotas->get().local().get(
Expand Down
7 changes: 7 additions & 0 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ struct config_connection_rate_bindings {
config::binding<std::vector<ss::sstring>> config_overrides_rate;
};

struct tcp_keepalive_bindings {
config::binding<std::chrono::seconds> keepalive_idle_time;
config::binding<std::chrono::seconds> keepalive_interval;
config::binding<uint32_t> keepalive_probes;
};

struct server_configuration {
std::vector<server_endpoint> addrs;
int64_t max_service_memory_per_core;
Expand All @@ -83,6 +89,7 @@ struct server_configuration {
= net::public_metrics_disabled::no;
ss::sstring name;
std::optional<config_connection_rate_bindings> connection_rate_bindings;
std::optional<tcp_keepalive_bindings> tcp_keepalive_bindings;
// we use the same default as seastar for load balancing algorithm
ss::server_socket::load_balancing_algorithm load_balancing_algo
= ss::server_socket::load_balancing_algorithm::connection_distribution;
Expand Down
11 changes: 11 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,17 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
};

c.connection_rate_bindings.emplace(std::move(bindings));

c.tcp_keepalive_bindings.emplace(net::tcp_keepalive_bindings{
.keepalive_idle_time
= config::shard_local_cfg()
.kafka_tcp_keepalive_idle_timeout_seconds.bind(),
.keepalive_interval
= config::shard_local_cfg()
.kafka_tcp_keepalive_probe_interval_seconds.bind(),
.keepalive_probes
= config::shard_local_cfg().kafka_tcp_keepalive_probes.bind(),
});
});
})
.get();
Expand Down
80 changes: 80 additions & 0 deletions tests/rptest/tests/tcp_keepalive_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2023 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.utils.util import wait_until
from rptest.services.cluster import cluster
from rptest.tests.redpanda_test import RedpandaTest

import subprocess
import random
import string
import sys

BOOTSTRAP_CONFIG = {
'kafka_tcp_keepalive_timeout': 1,
'kafka_tcp_keepalive_probe_interval_seconds': 1,
'kafka_tcp_keepalive_probes': 3
}


class TcpKeepaliveTest(RedpandaTest):
def __init__(self, *args, **kwargs):
rp_conf = BOOTSTRAP_CONFIG.copy()

super(TcpKeepaliveTest, self).__init__(*args,
extra_rp_conf=rp_conf,
**kwargs)

@cluster(num_nodes=1)
def test_tcp_keepalive(self):
"""
Test that TCP keepalive causes clients to disconnect
"""

try:
# create a random group
group_name = ''.join(random.choices(string.ascii_letters, k=20))
subprocess.check_call(['sudo', 'groupadd', group_name])

# spawn netcat running in that group
with subprocess.Popen([
'sudo', 'sg', group_name, "nc -v {} 9092".format(
self.redpanda.nodes[0].account.hostname)
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) as nc_proc:
# wait for us to connect
for line in nc_proc.stdout:
line = line.decode('utf-8').lower()
print(line, file=sys.stderr)
if 'succeeded' in line:
break

# add iptables rule for that group to drop all packets
subprocess.check_call([
'sudo', 'iptables', '-A', 'OUTPUT', '-m', 'owner',
'--gid-owner', group_name, '-j', 'DROP'
])

# tcp keepalive should now time out and RP should RST the connection making netcat stop
nc_proc.wait(timeout=10)

# confirm RP also saw the client gone
wait_until(lambda: self.redpanda.search_log_node(
self.redpanda.nodes[0], 'Disconnected'),
timeout_sec=10,
err_msg="Failed to find disconnect message in log")

finally:
# if these fail nothing we can do so don't check the return code
subprocess.call([
'sudo', 'iptables', '-D', 'OUTPUT', '-m', 'owner',
'--gid-owner', group_name, '-j', 'DROP'
])
subprocess.call(['sudo', 'groupdel', group_name])