diff --git a/tests/rptest/redpanda_cloud_tests/omb_validation_test.py b/tests/rptest/redpanda_cloud_tests/omb_validation_test.py index 9662a30005d2..239a4b75e85b 100644 --- a/tests/rptest/redpanda_cloud_tests/omb_validation_test.py +++ b/tests/rptest/redpanda_cloud_tests/omb_validation_test.py @@ -41,6 +41,17 @@ def get_globals_value(globals, key_name, default=None): class OMBValidationTest(RedpandaTest): + + # common workload details shared among most/all test methods + WORKLOAD_DEFAULTS = { + "topics": 1, + "message_size": 1 * KiB, + "payload_file": "payload/payload-1Kb.data", + "consumer_backlog_size_GB": 0, + "test_duration_minutes": 5, + "warmup_duration_minutes": 5, + } + def __init__(self, test_ctx: TestContext, *args, **kwargs): self._ctx = test_ctx # Get tier value @@ -102,10 +113,14 @@ def _partition_count(self) -> int: return 5 * tier_config.num_brokers * machine_config.num_shards def _producer_count(self, ingress_rate) -> int: - return max(ingress_rate // (4 * MiB), 8) + """Determine the number of producers based on the ingress rate. + We assume that each producer is capable of 5 MB/s.""" + return max(ingress_rate // (5 * MB), 1) def _consumer_count(self, egress_rate) -> int: - return max(egress_rate // (4 * MiB), 8) + """Determine the number of consumers based on the egress rate. + We assume that each consumer is capable of 5 MB/s.""" + return max(egress_rate // (5 * MB), 1) def _mb_to_mib(self, mb): return math.floor(0.9537 * mb) @@ -132,18 +147,14 @@ def test_max_connections(self): warmup_duration = 1 # minutes test_duration = 5 # minutes - workload = { + workload = self.WORKLOAD_DEFAULTS | { "name": "MaxConnectionsTestWorkload", - "topics": 1, "partitions_per_topic": self._partition_count(), "subscriptions_per_topic": subscriptions, "consumer_per_subscription": max(total_consumers // subscriptions, 1), "producers_per_topic": total_producers, "producer_rate": producer_rate // (1 * KiB), - "message_size": 1 * KiB, - "payload_file": "payload/payload-1Kb.data", - "consumer_backlog_size_GB": 0, "test_duration_minutes": test_duration, "warmup_duration_minutes": warmup_duration, } @@ -194,7 +205,8 @@ def test_max_connections(self): # single producer runtime # Roughly every 500 connection needs 60 seconds to ramp up time_per_500_s = 120 - warm_up_time_s = max(time_per_500_s * math.ceil(_target_per_node / 500), time_per_500_s) + warm_up_time_s = max( + time_per_500_s * math.ceil(_target_per_node / 500), time_per_500_s) target_runtime_s = 60 * (test_duration + warmup_duration) + warm_up_time_s records_per_producer = messages_per_sec_per_producer * target_runtime_s @@ -272,20 +284,14 @@ def test_max_partitions(self): total_producers = self._producer_count(producer_rate) total_consumers = self._consumer_count(producer_rate * subscriptions) - workload = { + workload = self.WORKLOAD_DEFAULTS | { "name": "MaxPartitionsTestWorkload", - "topics": 1, "partitions_per_topic": partitions_per_topic, "subscriptions_per_topic": subscriptions, "consumer_per_subscription": max(total_consumers // subscriptions, 1), "producers_per_topic": total_producers, "producer_rate": producer_rate / (1 * KiB), - "message_size": 1 * KiB, - "payload_file": "payload/payload-1Kb.data", - "consumer_backlog_size_GB": 0, - "test_duration_minutes": 5, - "warmup_duration_minutes": 1, } validator = self.base_validator | { @@ -323,20 +329,14 @@ def test_common_workload(self): ], } - workload = { + workload = self.WORKLOAD_DEFAULTS | { "name": "CommonTestWorkload", - "topics": 1, "partitions_per_topic": partitions, "subscriptions_per_topic": subscriptions, "consumer_per_subscription": max(total_consumers // subscriptions, 1), "producers_per_topic": total_producers, "producer_rate": tier_config.ingress_rate // (1 * KiB), - "message_size": 1 * KiB, - "payload_file": "payload/payload-1Kb.data", - "consumer_backlog_size_GB": 0, - "test_duration_minutes": 5, - "warmup_duration_minutes": 1, } driver = { @@ -382,20 +382,15 @@ def test_retention(self): total_producers = 10 total_consumers = 10 - workload = { + workload = self.WORKLOAD_DEFAULTS | { "name": "RetentionTestWorkload", - "topics": 1, "partitions_per_topic": partitions, "subscriptions_per_topic": subscriptions, "consumer_per_subscription": max(total_consumers // subscriptions, 1), "producers_per_topic": total_producers, "producer_rate": producer_rate // (1 * KiB), - "message_size": 1 * KiB, - "payload_file": "payload/payload-1Kb.data", - "consumer_backlog_size_GB": 0, "test_duration_minutes": test_duration_seconds // 60, - "warmup_duration_minutes": 1, } driver = { diff --git a/tests/rptest/services/openmessaging_benchmark.py b/tests/rptest/services/openmessaging_benchmark.py index b051d9c0e173..5cb34945a92d 100644 --- a/tests/rptest/services/openmessaging_benchmark.py +++ b/tests/rptest/services/openmessaging_benchmark.py @@ -194,6 +194,10 @@ def __init__(self, self.workload = workload[0] self.validator = workload[1] + assert int( + self.workload.get("warmup_duration_minutes", '0') + ) >= 1, "must use non-zero warmup time as we rely on warm-up message to detect test start" + self.logger.info("Using driver: %s, workload: %s", self.driver["name"], self.workload["name"]) @@ -275,7 +279,7 @@ def start_node(self, node): node.account.ssh(start_cmd) monitor.wait_until( "Starting warm-up traffic", - timeout_sec=60, + timeout_sec=300, backoff_sec=4, err_msg="Open Messaging Benchmark service didn't start") diff --git a/tests/rptest/services/openmessaging_benchmark_configs.py b/tests/rptest/services/openmessaging_benchmark_configs.py index caba5ff7ca65..92b87a368c8d 100644 --- a/tests/rptest/services/openmessaging_benchmark_configs.py +++ b/tests/rptest/services/openmessaging_benchmark_configs.py @@ -169,7 +169,7 @@ def validate_metrics(metrics, validator): "producer_config": { "enable.idempotence": "true", "acks": "all", - "liner.ms": 1, + "linger.ms": 1, "max.in.flight.requests.per.connection": 5, "batch.size": 131072, },