Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to OMB validation tests for T1 and T2 #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions tests/rptest/redpanda_cloud_tests/omb_validation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ def get_globals_value(globals, key_name, default=None):


class OMBValidationTest(RedpandaTest):

# common workload details shared among most/all test methods
WORKLOAD_DEFAULTS = {
"topics": 1,
"message_size": 1 * KiB,
"payload_file": "payload/payload-1Kb.data",
"consumer_backlog_size_GB": 0,
"test_duration_minutes": 5,
"warmup_duration_minutes": 5,
}

def __init__(self, test_ctx: TestContext, *args, **kwargs):
self._ctx = test_ctx
# Get tier value
Expand Down Expand Up @@ -102,10 +113,14 @@ def _partition_count(self) -> int:
return 5 * tier_config.num_brokers * machine_config.num_shards

def _producer_count(self, ingress_rate) -> int:
return max(ingress_rate // (4 * MiB), 8)
"""Determine the number of producers based on the ingress rate.
We assume that each producer is capable of 5 MB/s."""
return max(ingress_rate // (5 * MB), 1)

def _consumer_count(self, egress_rate) -> int:
return max(egress_rate // (4 * MiB), 8)
"""Determine the number of consumers based on the egress rate.
We assume that each consumer is capable of 5 MB/s."""
return max(egress_rate // (5 * MB), 1)

def _mb_to_mib(self, mb):
return math.floor(0.9537 * mb)
Expand All @@ -132,18 +147,14 @@ def test_max_connections(self):
warmup_duration = 1 # minutes
test_duration = 5 # minutes

workload = {
workload = self.WORKLOAD_DEFAULTS | {
"name": "MaxConnectionsTestWorkload",
"topics": 1,
"partitions_per_topic": self._partition_count(),
"subscriptions_per_topic": subscriptions,
"consumer_per_subscription": max(total_consumers // subscriptions,
1),
"producers_per_topic": total_producers,
"producer_rate": producer_rate // (1 * KiB),
"message_size": 1 * KiB,
"payload_file": "payload/payload-1Kb.data",
"consumer_backlog_size_GB": 0,
"test_duration_minutes": test_duration,
"warmup_duration_minutes": warmup_duration,
}
Expand Down Expand Up @@ -194,7 +205,8 @@ def test_max_connections(self):
# single producer runtime
# Roughly every 500 connection needs 60 seconds to ramp up
time_per_500_s = 120
warm_up_time_s = max(time_per_500_s * math.ceil(_target_per_node / 500), time_per_500_s)
warm_up_time_s = max(
time_per_500_s * math.ceil(_target_per_node / 500), time_per_500_s)
target_runtime_s = 60 * (test_duration +
warmup_duration) + warm_up_time_s
records_per_producer = messages_per_sec_per_producer * target_runtime_s
Expand Down Expand Up @@ -272,20 +284,14 @@ def test_max_partitions(self):
total_producers = self._producer_count(producer_rate)
total_consumers = self._consumer_count(producer_rate * subscriptions)

workload = {
workload = self.WORKLOAD_DEFAULTS | {
"name": "MaxPartitionsTestWorkload",
"topics": 1,
"partitions_per_topic": partitions_per_topic,
"subscriptions_per_topic": subscriptions,
"consumer_per_subscription": max(total_consumers // subscriptions,
1),
"producers_per_topic": total_producers,
"producer_rate": producer_rate / (1 * KiB),
"message_size": 1 * KiB,
"payload_file": "payload/payload-1Kb.data",
"consumer_backlog_size_GB": 0,
"test_duration_minutes": 5,
"warmup_duration_minutes": 1,
}

validator = self.base_validator | {
Expand Down Expand Up @@ -323,20 +329,14 @@ def test_common_workload(self):
],
}

workload = {
workload = self.WORKLOAD_DEFAULTS | {
"name": "CommonTestWorkload",
"topics": 1,
"partitions_per_topic": partitions,
"subscriptions_per_topic": subscriptions,
"consumer_per_subscription": max(total_consumers // subscriptions,
1),
"producers_per_topic": total_producers,
"producer_rate": tier_config.ingress_rate // (1 * KiB),
"message_size": 1 * KiB,
"payload_file": "payload/payload-1Kb.data",
"consumer_backlog_size_GB": 0,
"test_duration_minutes": 5,
"warmup_duration_minutes": 1,
}

driver = {
Expand Down Expand Up @@ -382,20 +382,15 @@ def test_retention(self):
total_producers = 10
total_consumers = 10

workload = {
workload = self.WORKLOAD_DEFAULTS | {
"name": "RetentionTestWorkload",
"topics": 1,
"partitions_per_topic": partitions,
"subscriptions_per_topic": subscriptions,
"consumer_per_subscription": max(total_consumers // subscriptions,
1),
"producers_per_topic": total_producers,
"producer_rate": producer_rate // (1 * KiB),
"message_size": 1 * KiB,
"payload_file": "payload/payload-1Kb.data",
"consumer_backlog_size_GB": 0,
"test_duration_minutes": test_duration_seconds // 60,
"warmup_duration_minutes": 1,
}

driver = {
Expand Down
6 changes: 5 additions & 1 deletion tests/rptest/services/openmessaging_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ def __init__(self,
self.workload = workload[0]
self.validator = workload[1]

assert int(
self.workload.get("warmup_duration_minutes", '0')
) >= 1, "must use non-zero warmup time as we rely on warm-up message to detect test start"

self.logger.info("Using driver: %s, workload: %s", self.driver["name"],
self.workload["name"])

Expand Down Expand Up @@ -275,7 +279,7 @@ def start_node(self, node):
node.account.ssh(start_cmd)
monitor.wait_until(
"Starting warm-up traffic",
timeout_sec=60,
timeout_sec=300,
backoff_sec=4,
err_msg="Open Messaging Benchmark service didn't start")

Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/services/openmessaging_benchmark_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def validate_metrics(metrics, validator):
"producer_config": {
"enable.idempotence": "true",
"acks": "all",
"liner.ms": 1,
"linger.ms": 1,
"max.in.flight.requests.per.connection": 5,
"batch.size": 131072,
},
Expand Down