diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 0811bcd845ab2..df34015438d74 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -16,7 +16,9 @@ import itertools import os +from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.services.background_thread import BackgroundThreadService +from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin @@ -296,3 +298,18 @@ def clean_node(self, node): def java_class_name(self): return "ConsoleConsumer" + + def has_log_message(self, node, message): + try: + node.account.ssh("grep '%s' %s" % (message, ConsoleConsumer.LOG_FILE)) + except RemoteCommandError: + return False + return True + + def wait_for_offset_reset(self, node, topic, num_partitions): + for partition in range(num_partitions): + message = "Resetting offset for partition %s-%d" % (topic, partition) + wait_until(lambda: self.has_log_message(node, message), + timeout_sec=60, + err_msg="Offset not reset for partition %s-%d" % (topic, partition)) + diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index 22aa096db0320..c691cbc12b072 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -59,6 +59,11 @@ def start_producer_and_consumer(self): err_msg="Consumer process took more than %d s to fork" %\ self.consumer_init_timeout_sec) + # If consuming only latest messages, wait for offset reset to ensure all messages are consumed + if not self.consumer.from_beginning: + self.consumer.wait_for_offset_reset(self.consumer.nodes[0], self.topic, self.num_partitions) + + self.producer.start() wait_until(lambda: self.producer.num_acked > 5, timeout_sec=self.producer_start_timeout_sec,