Skip to content

Commit

Permalink
tests: refine ManyClientsTest
Browse files Browse the repository at this point in the history
- Use rate limiting in the client, so that it is
  not vulnerable to very long delays from rate limiting,
  causing rare timeouts due to statistical unfairness of
  which clients get limited.
- Add a 'realistic' compaction case to accompany the
  pathological case.  Realistic is incompressible data,
  so we're just paying the CPU tax, pathological is zeros,
  where we hit the memory inflation risk.
- Make the test adaptively choose messages counts for a
  target runtime.
- Configure a node rate limit that is aligned with the
  IOPs throughput of i3en.xlarge nodes when we are sending
  lots of tiny messages.
- Set a heuristic "effective message size" for the pathological
  compaction/compression case, which reflects the equivalent
  uncompressed message size for throughput calculation
  purposes.

Fixes #10092
  • Loading branch information
jcsp committed Apr 21, 2023
1 parent c9b8e25 commit ff73f40
Showing 1 changed file with 86 additions and 42 deletions.
128 changes: 86 additions & 42 deletions tests/rptest/scale_tests/many_clients_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import enum

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
Expand All @@ -17,10 +18,20 @@
from ducktape.mark import matrix


class CompactionMode(str, enum.Enum):
NONE = "none"
REALISTIC = "realistic"
PATHOLOGICAL = "pathological"


class ManyClientsTest(RedpandaTest):
PRODUCER_COUNT = 4000

TARGET_THROUGHPUT_MB_S_PER_NODE = 104857600
# Our workload generates tiny I/Os, so we anticipate being IOP-bound.
# For an i3en.xlarge that can do about 65,000 IOPs, and considering 3x
# replication, our per-node throughput would be be about 1/3 of that, if
# we were perfectly IOP-bound.
TARGET_THROUGHPUT_MB_S_PER_NODE = 40

def __init__(self, *args, **kwargs):
# We will send huge numbers of messages, so tune down the log verbosity
Expand All @@ -30,23 +41,25 @@ def __init__(self, *args, **kwargs):
kwargs['extra_rp_conf'] = {
# Enable segment size jitter as this is a stress test and does not
# rely on exact segment counts.
'log_segment_size_jitter_percent': 5,
'log_segment_size_jitter_percent':
5,

# This limit caps the produce throughput to a sustainable rate for a RP
# cluster that has 384MB of memory per shard. It is set here to
# since our current backpressure mechanisms will allow producers to
# produce at a much higher rate and cause RP to run out of memory.
'target_quota_byte_rate':
31460000, # 30MiB/s of throughput per shard
# Same intention as above but utilizing node-wide throughput limit
'kafka_throughput_limit_node_in_bps':
self.TARGET_THROUGHPUT_MB_S_PER_NODE, # 100MiB/s per node
self.TARGET_THROUGHPUT_MB_S_PER_NODE * 1024 *
1024, # 100MiB/s per node

# Set higher connection count limits than the redpanda default.
# Factor of 4: allow each client 3 connections (producer,consumer,admin), plus
# 1 connection to accomodate reconnects while a previous connection is
# still live.
'kafka_connections_max': self.PRODUCER_COUNT * 4,
'kafka_connections_max_per_ip': self.PRODUCER_COUNT * 4,
'kafka_connections_max':
self.PRODUCER_COUNT * 4,
'kafka_connections_max_per_ip':
self.PRODUCER_COUNT * 4,
}
super().__init__(*args, **kwargs)

Expand All @@ -55,8 +68,11 @@ def setUp(self):
pass

@cluster(num_nodes=7)
@matrix(compacted=[True, False])
def test_many_clients(self, compacted):
@matrix(compaction_mode=[
CompactionMode.NONE, CompactionMode.REALISTIC,
CompactionMode.PATHOLOGICAL
])
def test_many_clients(self, compaction_mode):
"""
Check that redpanda remains stable under higher numbers of clients
than usual.
Expand All @@ -66,13 +82,22 @@ def test_many_clients(self, compacted):
# a workstation.
assert not self.debug_mode

# 20MiB is the largest vector of zeros we can use while keeping
# the compressed size <1MiB. snappy is the worst algo for simple arrays
# of zeros.
pathological_record_size_mb = 20
if compaction_mode == CompactionMode.PATHOLOGICAL:
compacted_record_size_max_mb = pathological_record_size_mb
else:
compacted_record_size_max_mb = 1

num_cpus = 2

# Compacted topics using compression have a higher
# memory footprint: they may decompress/compress up to two batches
# per shard concurrently (one for index updates on the produce path,
# one for housekeeping)
compacted_record_size_max_mb = 32
num_cpus = 2
memory_mb = 768 if not compacted else 768 + compacted_record_size_max_mb * 2 * num_cpus
memory_mb = 768 if compaction_mode == CompactionMode.NONE else 768 + compacted_record_size_max_mb * 2 * num_cpus

resource_settings = ResourceSettings(
num_cpus=num_cpus,
Expand All @@ -87,25 +112,14 @@ def test_many_clients(self, compacted):
partition_count = 100
producer_count = self.PRODUCER_COUNT

if not self.redpanda.dedicated_nodes:
# This mode is handy for developers on their workstations
producer_count //= 10
self.logger.info(
f"Running at reduced scale ({producer_count} producers)")

partition_count = 16

PRODUCER_TIMEOUT_MS = 5000
TOPIC_NAME = "manyclients"

# Realistic conditions: 128MB is the segment size in the cloud
segment_size = 128 * 1024 * 1024
retention_size = 8 * segment_size

cleanup_policy = "compact" if compacted else "delete"

target_throughput_mb_s = self.TARGET_THROUGHPUT_MB_S_PER_NODE * len(
self.redpanda.nodes)
cleanup_policy = "compact" if compaction_mode != CompactionMode.NONE else "delete"

self.client().create_topic(
TopicSpec(name=TOPIC_NAME,
Expand Down Expand Up @@ -135,39 +149,69 @@ def test_many_clients(self, compacted):
save_msgs=False)

key_space = 10
records_per_producer = 1000

target_throughput_mb_s = self.TARGET_THROUGHPUT_MB_S_PER_NODE * len(
self.redpanda.nodes)

producer_kwargs = {}
if compacted:
if compaction_mode == CompactionMode.NONE:
producer_kwargs['min_record_size'] = 0
producer_kwargs['max_record_size'] = 16384

effective_msg_size = producer_kwargs['min_record_size'] + (
producer_kwargs['max_record_size'] -
producer_kwargs['min_record_size']) // 2
else:
# Compaction is much more stressful when the clients sends compacted
# data, because the server must decompress it to index it.
producer_kwargs['compression_type'] = 'mixed'

# Use large compressible payloads, to stress memory consumption: a
# compressed batch will be accepted by the Kafka API, but
producer_kwargs['compressible_payload'] = True,
producer_kwargs['min_record_size'] = 16 * 1024 * 1024
producer_kwargs[
'max_record_size'] = compacted_record_size_max_mb * 1024 * 1024
if compaction_mode == CompactionMode.PATHOLOGICAL:
# Use large compressible payloads, to stress memory consumption: a
# compressed batch will be accepted by the Kafka API, but
producer_kwargs['compressible_payload'] = True,
producer_kwargs['min_record_size'] = 16 * 1024 * 1024
producer_kwargs[
'max_record_size'] = pathological_record_size_mb * 1024 * 1024

# Actual messages are tiny. Use a heuristic to approximate what kind
# of throughput one of these "big batch of zeros" messages would
# correspond to if it was a regular batch.
effective_msg_size = 256000
else:
# Regular data that just happens to be compressed: we take the CPU
# hit but do not have batches that blow up into huge memory.
producer_kwargs['min_record_size'] = 0
producer_kwargs['max_record_size'] = 16384

effective_msg_size = producer_kwargs['min_record_size'] + (
producer_kwargs['max_record_size'] -
producer_kwargs['min_record_size']) // 2

producer_kwargs['keys'] = key_space

# Clients have to do the compression work on these larger messages,
# so curb our expectations about how many we may run concurrently.
producer_count = producer_count // 10
else:
producer_kwargs['min_record_size'] = 0
producer_kwargs['max_record_size'] = 16384

mean_msg_size = producer_kwargs['min_record_size'] + (
producer_kwargs['max_record_size'] -
producer_kwargs['min_record_size']) // 2
msg_rate = (target_throughput_mb_s * 1024 * 1024) // mean_msg_size
messages_per_sec_per_producer = msg_rate // self.PRODUCER_COUNT
self.logger.info(
f"Using mean message size {effective_msg_size} ({producer_kwargs['min_record_size']}-{producer_kwargs['max_record_size']})"
)

msg_rate = (target_throughput_mb_s * 1024 * 1024) // effective_msg_size
messages_per_sec_per_producer = msg_rate // producer_count

# If this fails, the test was altered to have an impractical ratio of
# producers to traffic rate.
assert messages_per_sec_per_producer > 0, "Bad sizing params, need at least 1 MPS"

target_runtime_s = 30
records_per_producer = messages_per_sec_per_producer * target_runtime_s

self.logger.info(
f"compaction={compaction_mode} mode, {producer_count} producers writing {messages_per_sec_per_producer} msg/s each, {records_per_producer} records each"
)

producer = ProducerSwarm(self.test_context,
self.redpanda,
TOPIC_NAME,
Expand All @@ -183,7 +227,7 @@ def test_many_clients(self, compacted):
producer.wait()

expect = producer_count * records_per_producer
if compacted:
if compaction_mode != CompactionMode.NONE:
# When using compaction, we may well not see all the original messages, as
# they could have been compacted away before we read them.
expect = min(partition_count * key_space, expect)
Expand Down

0 comments on commit ff73f40

Please sign in to comment.