From ff73f4091174134ba713b44cab5a724656d1dec2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 20 Apr 2023 14:41:16 +0100 Subject: [PATCH] tests: refine ManyClientsTest - Use rate limiting in the client, so that it is not vulnerable to very long delays from rate limiting, causing rare timeouts due to statistical unfairness of which clients get limited. - Add a 'realistic' compaction case to accompany the pathological case. Realistic is incompressible data, so we're just paying the CPU tax, pathological is zeros, where we hit the memory inflation risk. - Make the test adaptively choose messages counts for a target runtime. - Configure a node rate limit that is aligned with the IOPs throughput of i3en.xlarge nodes when we are sending lots of tiny messages. - Set a heuristic "effective message size" for the pathological compaction/compression case, which reflects the equivalent uncompressed message size for throughput calculation purposes. Fixes https://github.com/redpanda-data/redpanda/issues/10092 --- tests/rptest/scale_tests/many_clients_test.py | 128 ++++++++++++------ 1 file changed, 86 insertions(+), 42 deletions(-) diff --git a/tests/rptest/scale_tests/many_clients_test.py b/tests/rptest/scale_tests/many_clients_test.py index c16e3f152516..805791d1aa48 100644 --- a/tests/rptest/scale_tests/many_clients_test.py +++ b/tests/rptest/scale_tests/many_clients_test.py @@ -6,6 +6,7 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import enum from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest @@ -17,10 +18,20 @@ from ducktape.mark import matrix +class CompactionMode(str, enum.Enum): + NONE = "none" + REALISTIC = "realistic" + PATHOLOGICAL = "pathological" + + class ManyClientsTest(RedpandaTest): PRODUCER_COUNT = 4000 - TARGET_THROUGHPUT_MB_S_PER_NODE = 104857600 + # Our workload generates tiny I/Os, so we anticipate being IOP-bound. + # For an i3en.xlarge that can do about 65,000 IOPs, and considering 3x + # replication, our per-node throughput would be be about 1/3 of that, if + # we were perfectly IOP-bound. + TARGET_THROUGHPUT_MB_S_PER_NODE = 40 def __init__(self, *args, **kwargs): # We will send huge numbers of messages, so tune down the log verbosity @@ -30,23 +41,25 @@ def __init__(self, *args, **kwargs): kwargs['extra_rp_conf'] = { # Enable segment size jitter as this is a stress test and does not # rely on exact segment counts. - 'log_segment_size_jitter_percent': 5, + 'log_segment_size_jitter_percent': + 5, + # This limit caps the produce throughput to a sustainable rate for a RP # cluster that has 384MB of memory per shard. It is set here to # since our current backpressure mechanisms will allow producers to # produce at a much higher rate and cause RP to run out of memory. - 'target_quota_byte_rate': - 31460000, # 30MiB/s of throughput per shard - # Same intention as above but utilizing node-wide throughput limit 'kafka_throughput_limit_node_in_bps': - self.TARGET_THROUGHPUT_MB_S_PER_NODE, # 100MiB/s per node + self.TARGET_THROUGHPUT_MB_S_PER_NODE * 1024 * + 1024, # 100MiB/s per node # Set higher connection count limits than the redpanda default. # Factor of 4: allow each client 3 connections (producer,consumer,admin), plus # 1 connection to accomodate reconnects while a previous connection is # still live. - 'kafka_connections_max': self.PRODUCER_COUNT * 4, - 'kafka_connections_max_per_ip': self.PRODUCER_COUNT * 4, + 'kafka_connections_max': + self.PRODUCER_COUNT * 4, + 'kafka_connections_max_per_ip': + self.PRODUCER_COUNT * 4, } super().__init__(*args, **kwargs) @@ -55,8 +68,11 @@ def setUp(self): pass @cluster(num_nodes=7) - @matrix(compacted=[True, False]) - def test_many_clients(self, compacted): + @matrix(compaction_mode=[ + CompactionMode.NONE, CompactionMode.REALISTIC, + CompactionMode.PATHOLOGICAL + ]) + def test_many_clients(self, compaction_mode): """ Check that redpanda remains stable under higher numbers of clients than usual. @@ -66,13 +82,22 @@ def test_many_clients(self, compacted): # a workstation. assert not self.debug_mode + # 20MiB is the largest vector of zeros we can use while keeping + # the compressed size <1MiB. snappy is the worst algo for simple arrays + # of zeros. + pathological_record_size_mb = 20 + if compaction_mode == CompactionMode.PATHOLOGICAL: + compacted_record_size_max_mb = pathological_record_size_mb + else: + compacted_record_size_max_mb = 1 + + num_cpus = 2 + # Compacted topics using compression have a higher # memory footprint: they may decompress/compress up to two batches # per shard concurrently (one for index updates on the produce path, # one for housekeeping) - compacted_record_size_max_mb = 32 - num_cpus = 2 - memory_mb = 768 if not compacted else 768 + compacted_record_size_max_mb * 2 * num_cpus + memory_mb = 768 if compaction_mode == CompactionMode.NONE else 768 + compacted_record_size_max_mb * 2 * num_cpus resource_settings = ResourceSettings( num_cpus=num_cpus, @@ -87,14 +112,6 @@ def test_many_clients(self, compacted): partition_count = 100 producer_count = self.PRODUCER_COUNT - if not self.redpanda.dedicated_nodes: - # This mode is handy for developers on their workstations - producer_count //= 10 - self.logger.info( - f"Running at reduced scale ({producer_count} producers)") - - partition_count = 16 - PRODUCER_TIMEOUT_MS = 5000 TOPIC_NAME = "manyclients" @@ -102,10 +119,7 @@ def test_many_clients(self, compacted): segment_size = 128 * 1024 * 1024 retention_size = 8 * segment_size - cleanup_policy = "compact" if compacted else "delete" - - target_throughput_mb_s = self.TARGET_THROUGHPUT_MB_S_PER_NODE * len( - self.redpanda.nodes) + cleanup_policy = "compact" if compaction_mode != CompactionMode.NONE else "delete" self.client().create_topic( TopicSpec(name=TOPIC_NAME, @@ -135,39 +149,69 @@ def test_many_clients(self, compacted): save_msgs=False) key_space = 10 - records_per_producer = 1000 + + target_throughput_mb_s = self.TARGET_THROUGHPUT_MB_S_PER_NODE * len( + self.redpanda.nodes) producer_kwargs = {} - if compacted: + if compaction_mode == CompactionMode.NONE: + producer_kwargs['min_record_size'] = 0 + producer_kwargs['max_record_size'] = 16384 + + effective_msg_size = producer_kwargs['min_record_size'] + ( + producer_kwargs['max_record_size'] - + producer_kwargs['min_record_size']) // 2 + else: # Compaction is much more stressful when the clients sends compacted # data, because the server must decompress it to index it. producer_kwargs['compression_type'] = 'mixed' - # Use large compressible payloads, to stress memory consumption: a - # compressed batch will be accepted by the Kafka API, but - producer_kwargs['compressible_payload'] = True, - producer_kwargs['min_record_size'] = 16 * 1024 * 1024 - producer_kwargs[ - 'max_record_size'] = compacted_record_size_max_mb * 1024 * 1024 + if compaction_mode == CompactionMode.PATHOLOGICAL: + # Use large compressible payloads, to stress memory consumption: a + # compressed batch will be accepted by the Kafka API, but + producer_kwargs['compressible_payload'] = True, + producer_kwargs['min_record_size'] = 16 * 1024 * 1024 + producer_kwargs[ + 'max_record_size'] = pathological_record_size_mb * 1024 * 1024 + + # Actual messages are tiny. Use a heuristic to approximate what kind + # of throughput one of these "big batch of zeros" messages would + # correspond to if it was a regular batch. + effective_msg_size = 256000 + else: + # Regular data that just happens to be compressed: we take the CPU + # hit but do not have batches that blow up into huge memory. + producer_kwargs['min_record_size'] = 0 + producer_kwargs['max_record_size'] = 16384 + + effective_msg_size = producer_kwargs['min_record_size'] + ( + producer_kwargs['max_record_size'] - + producer_kwargs['min_record_size']) // 2 + producer_kwargs['keys'] = key_space # Clients have to do the compression work on these larger messages, # so curb our expectations about how many we may run concurrently. producer_count = producer_count // 10 - else: - producer_kwargs['min_record_size'] = 0 - producer_kwargs['max_record_size'] = 16384 - mean_msg_size = producer_kwargs['min_record_size'] + ( - producer_kwargs['max_record_size'] - - producer_kwargs['min_record_size']) // 2 - msg_rate = (target_throughput_mb_s * 1024 * 1024) // mean_msg_size - messages_per_sec_per_producer = msg_rate // self.PRODUCER_COUNT + self.logger.info( + f"Using mean message size {effective_msg_size} ({producer_kwargs['min_record_size']}-{producer_kwargs['max_record_size']})" + ) + + msg_rate = (target_throughput_mb_s * 1024 * 1024) // effective_msg_size + messages_per_sec_per_producer = msg_rate // producer_count # If this fails, the test was altered to have an impractical ratio of # producers to traffic rate. assert messages_per_sec_per_producer > 0, "Bad sizing params, need at least 1 MPS" + target_runtime_s = 30 + records_per_producer = messages_per_sec_per_producer * target_runtime_s + + self.logger.info( + f"compaction={compaction_mode} mode, {producer_count} producers writing {messages_per_sec_per_producer} msg/s each, {records_per_producer} records each" + ) + producer = ProducerSwarm(self.test_context, self.redpanda, TOPIC_NAME, @@ -183,7 +227,7 @@ def test_many_clients(self, compacted): producer.wait() expect = producer_count * records_per_producer - if compacted: + if compaction_mode != CompactionMode.NONE: # When using compaction, we may well not see all the original messages, as # they could have been compacted away before we read them. expect = min(partition_count * key_space, expect)