Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: integration testing for compaction with compression #9594

Merged
merged 8 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ function install_rust_tools() {

git clone https://github.com/redpanda-data/client-swarm.git
pushd client-swarm
git reset --hard 63b4cd558203cdd79a69a0893c7435104c10f428
git reset --hard 5610f614545ee34f593e1279b30ee9986959d9b0
cargo build --release
cp target/release/client-swarm /usr/local/bin
popd
Expand Down
167 changes: 144 additions & 23 deletions tests/rptest/scale_tests/many_clients_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import enum

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
Expand All @@ -14,70 +15,118 @@
from rptest.services.rpk_consumer import RpkConsumer

from rptest.services.producer_swarm import ProducerSwarm
from ducktape.mark import matrix

resource_settings = ResourceSettings(
num_cpus=2,

# Set a low memory size, such that there is only ~100k of memory available
# for dealing with each client.
memory_mb=768)
class CompactionMode(str, enum.Enum):
NONE = "none"
REALISTIC = "realistic"
PATHOLOGICAL = "pathological"


class ManyClientsTest(RedpandaTest):
PRODUCER_COUNT = 4000

# Our workload generates tiny I/Os, so we anticipate being IOP-bound.
# For an i3en.xlarge that can do about 65,000 IOPs, and considering 3x
# replication, our per-node throughput would be be about 1/3 of that, if
# we were perfectly IOP-bound.
TARGET_THROUGHPUT_MB_S_PER_NODE = 40

def __init__(self, *args, **kwargs):
# We will send huge numbers of messages, so tune down the log verbosity
# as this is just a "did we stay up?" test
kwargs['log_level'] = "info"
kwargs['resource_settings'] = resource_settings

kwargs['extra_rp_conf'] = {
# Enable segment size jitter as this is a stress test and does not
# rely on exact segment counts.
'log_segment_size_jitter_percent': 5,
'log_segment_size_jitter_percent':
5,

# This limit caps the produce throughput to a sustainable rate for a RP
# cluster that has 384MB of memory per shard. It is set here to
# since our current backpressure mechanisms will allow producers to
# produce at a much higher rate and cause RP to run out of memory.
'target_quota_byte_rate':
31460000, # 30MiB/s of throughput per shard
# Same intention as above but utilizing node-wide throughput limit
'kafka_throughput_limit_node_in_bps':
104857600, # 100MiB/s per node
self.TARGET_THROUGHPUT_MB_S_PER_NODE * 1024 *
1024, # 100MiB/s per node

# Set higher connection count limits than the redpanda default.
# Factor of 4: allow each client 3 connections (producer,consumer,admin), plus
# 1 connection to accomodate reconnects while a previous connection is
# still live.
'kafka_connections_max': self.PRODUCER_COUNT * 4,
'kafka_connections_max_per_ip': self.PRODUCER_COUNT * 4,
'kafka_connections_max':
self.PRODUCER_COUNT * 4,
'kafka_connections_max_per_ip':
self.PRODUCER_COUNT * 4,
}
super().__init__(*args, **kwargs)

def setUp(self):
# Delay starting Redpanda, we will customize ResourceSettings inside the test case
pass

@cluster(num_nodes=7)
def test_many_clients(self):
@matrix(compaction_mode=[
CompactionMode.NONE, CompactionMode.REALISTIC,
CompactionMode.PATHOLOGICAL
])
def test_many_clients(self, compaction_mode):
"""
Check that redpanda remains stable under higher numbers of clients
than usual.
"""

# Scale tests are not run on debug builds
# This test won't work on a debug build, even if you're just testing on
# a workstation.
assert not self.debug_mode

PARTITION_COUNT = 100
# 20MiB is the largest vector of zeros we can use while keeping
# the compressed size <1MiB. snappy is the worst algo for simple arrays
# of zeros.
pathological_record_size_mb = 20
if compaction_mode == CompactionMode.PATHOLOGICAL:
compacted_record_size_max_mb = pathological_record_size_mb
else:
compacted_record_size_max_mb = 1

num_cpus = 2

# Compacted topics using compression have a higher
# memory footprint: they may decompress/compress up to two batches
# per shard concurrently (one for index updates on the produce path,
# one for housekeeping)
memory_mb = 768 if compaction_mode == CompactionMode.NONE else 768 + compacted_record_size_max_mb * 2 * num_cpus

resource_settings = ResourceSettings(
num_cpus=num_cpus,
# Set a low memory size, such that the amount of available memory per client
# is small (~100k)
memory_mb=memory_mb)

# Load the resource settings and start Redpanda
self.redpanda.set_resource_settings(resource_settings)
super().setUp()

partition_count = 100
producer_count = self.PRODUCER_COUNT

PRODUCER_TIMEOUT_MS = 5000
TOPIC_NAME = "manyclients"
RECORDS_PER_PRODUCER = 1000

# Realistic conditions: 128MB is the segment size in the cloud
segment_size = 128 * 1024 * 1024
retention_size = 8 * segment_size

cleanup_policy = "compact" if compaction_mode != CompactionMode.NONE else "delete"

self.client().create_topic(
TopicSpec(name=TOPIC_NAME,
partition_count=PARTITION_COUNT,
partition_count=partition_count,
retention_bytes=retention_size,
segment_bytes=segment_size))
segment_bytes=segment_size,
cleanup_policy=cleanup_policy))

# Three consumers, just so that we are at least touching consumer
# group functionality, if not stressing the overall number of consumers.
Expand All @@ -99,21 +148,93 @@ def test_many_clients(self):
group="testgroup",
save_msgs=False)

key_space = 10

target_throughput_mb_s = self.TARGET_THROUGHPUT_MB_S_PER_NODE * len(
self.redpanda.nodes)

producer_kwargs = {}
if compaction_mode == CompactionMode.NONE:
producer_kwargs['min_record_size'] = 0
producer_kwargs['max_record_size'] = 16384

effective_msg_size = producer_kwargs['min_record_size'] + (
producer_kwargs['max_record_size'] -
producer_kwargs['min_record_size']) // 2
else:
# Compaction is much more stressful when the clients sends compacted
# data, because the server must decompress it to index it.
producer_kwargs['compression_type'] = 'mixed'

if compaction_mode == CompactionMode.PATHOLOGICAL:
# Use large compressible payloads, to stress memory consumption: a
# compressed batch will be accepted by the Kafka API, but
producer_kwargs['compressible_payload'] = True
producer_kwargs['min_record_size'] = 16 * 1024 * 1024
producer_kwargs[
'max_record_size'] = pathological_record_size_mb * 1024 * 1024

# Actual messages are tiny. Use a heuristic to approximate what kind
# of throughput one of these "big batch of zeros" messages would
# correspond to if it was a regular batch.
effective_msg_size = 256000
else:
# Regular data that just happens to be compressed: we take the CPU
# hit but do not have batches that blow up into huge memory.
producer_kwargs['min_record_size'] = 0
producer_kwargs['max_record_size'] = 16384

effective_msg_size = producer_kwargs['min_record_size'] + (
producer_kwargs['max_record_size'] -
producer_kwargs['min_record_size']) // 2

producer_kwargs['keys'] = key_space

# Clients have to do the compression work on these larger messages,
# so curb our expectations about how many we may run concurrently.
producer_count = producer_count // 10

self.logger.info(
f"Using mean message size {effective_msg_size} ({producer_kwargs['min_record_size']}-{producer_kwargs['max_record_size']})"
)

msg_rate = (target_throughput_mb_s * 1024 * 1024) // effective_msg_size
messages_per_sec_per_producer = msg_rate // producer_count
producer_kwargs[
'messages_per_second_per_producer'] = messages_per_sec_per_producer

# If this fails, the test was altered to have an impractical ratio of
# producers to traffic rate.
assert messages_per_sec_per_producer > 0, "Bad sizing params, need at least 1 MPS"

target_runtime_s = 30
records_per_producer = messages_per_sec_per_producer * target_runtime_s

self.logger.info(
f"compaction={compaction_mode} mode, {producer_count} producers writing {messages_per_sec_per_producer} msg/s each, {records_per_producer} records each"
)

producer = ProducerSwarm(self.test_context,
jcsp marked this conversation as resolved.
Show resolved Hide resolved
self.redpanda,
TOPIC_NAME,
self.PRODUCER_COUNT,
RECORDS_PER_PRODUCER,
timeout_ms=PRODUCER_TIMEOUT_MS)
producer_count,
records_per_producer,
timeout_ms=PRODUCER_TIMEOUT_MS,
**producer_kwargs)
producer.start()
consumer_a.start()
consumer_b.start()
consumer_c.start()

producer.wait()

expect = producer_count * records_per_producer
if compaction_mode != CompactionMode.NONE:
# When using compaction, we may well not see all the original messages, as
# they could have been compacted away before we read them.
expect = min(partition_count * key_space, expect)

def complete():
expect = self.PRODUCER_COUNT * RECORDS_PER_PRODUCER
self.logger.info(
f"Message counts: {consumer_a.message_count} {consumer_b.message_count} {consumer_c.message_count} (vs {expect})"
)
Expand Down
13 changes: 12 additions & 1 deletion tests/rptest/services/kgo_repeater_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def __init__(self,
use_transactions: bool = False,
transaction_abort_rate: Optional[float] = None,
rate_limit_bps: Optional[int] = None,
msgs_per_transaction: Optional[int] = None):
msgs_per_transaction: Optional[int] = None,
compression_type: Optional[str] = None,
compressible_payload: Optional[bool] = None):
"""
:param rate_limit_bps: Total rate for all nodes: each node will get an equal share.
"""
Expand Down Expand Up @@ -84,6 +86,9 @@ def __init__(self,
self.transaction_abort_rate = transaction_abort_rate
self.msgs_per_transaction = msgs_per_transaction

self.compression_type = compression_type
self.compressible_payload = compressible_payload

self._stopped = False

def clean_node(self, node):
Expand Down Expand Up @@ -123,6 +128,12 @@ def start_node(self, node, clean=None):
if self.msgs_per_transaction is not None:
cmd += f" -msgs-per-transaction={self.msgs_per_transaction}"

if self.compression_type is not None:
cmd += f" -compression-type={self.compression_type}"

if self.compressible_payload is not None:
cmd += f" -compressible-payload={'true' if self.compressible_payload else 'false'}"

cmd = f"nohup {cmd} >> {self.LOG_PATH} 2>&1 &"

self.logger.info(f"start_node[{node.name}]: {cmd}")
Expand Down
36 changes: 34 additions & 2 deletions tests/rptest/services/producer_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# by the Apache License, Version 2.0

from ducktape.services.service import Service
from typing import Optional


class ProducerSwarm(Service):
Expand All @@ -24,18 +25,30 @@ def __init__(self,
records_per_producer: int,
log_level="DEBUG",
properties={},
timeout_ms: int = 1000):
timeout_ms: int = 1000,
compression_type: Optional[str] = None,
compressible_payload: Optional[bool] = None,
min_record_size: Optional[int] = None,
max_record_size: Optional[int] = None,
keys: Optional[int] = None,
messages_per_second_per_producer: Optional[int] = None):
super(ProducerSwarm, self).__init__(context, num_nodes=1)
self._redpanda = redpanda
self._topic = topic
self._producers = producers
self._records_per_producer = records_per_producer
self._messages_per_second_per_producer = messages_per_second_per_producer
self._log_level = log_level
self._properties = properties
self._timeout_ms = timeout_ms
self._compression_type = compression_type
self._compressible_payload = compressible_payload
self._min_record_size = min_record_size
self._max_record_size = max_record_size
self._keys = keys

def clean_node(self, node):
self.redpanda.logger.debug(f"{self.__class__.__name__}.clean_node")
self._redpanda.logger.debug(f"{self.__class__.__name__}.clean_node")
node.account.kill_process(self.EXE, clean_shutdown=False)
if node.account.exists(self.LOG_PATH):
node.account.remove(self.LOG_PATH)
Expand All @@ -49,6 +62,25 @@ def start_node(self, node, clean=None):
cmd += f" --timeout-ms {self._timeout_ms}"
for k, v in self._properties.items():
cmd += f" --properties {k}={v}"

if self._compressible_payload:
cmd += f" --compressible-payload"

if self._compression_type is not None:
cmd += f" --compression-type={self._compression_type}"

if self._min_record_size is not None:
cmd += f" --min-record-size={self._min_record_size}"

if self._max_record_size is not None:
cmd += f" --max-record-size={self._max_record_size}"

if self._keys is not None:
cmd += f" --keys={self._keys}"

if self._messages_per_second_per_producer is not None:
cmd += f" --messages-per-second {self._messages_per_second_per_producer}"

cmd = f"RUST_LOG={self._log_level} bash /opt/remote/control/start.sh {self.EXE} \"{cmd}\""
node.account.ssh(cmd)
self._redpanda.wait_until(
Expand Down