diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 3aeed906b1d84..5fd471217ec8c 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -249,7 +249,6 @@ def _worker(self, idx, node): consumer_output = node.account.ssh_capture(cmd, allow_fail=False) with self.lock: - self._init_jmx_attributes() self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names) self.start_jmx_tool(idx, node) @@ -292,28 +291,3 @@ def clean_node(self, node): def java_class_name(self): return "ConsoleConsumer" - - def has_partitions_assigned(self, node): - if self.new_consumer is False: - return False - idx = self.idx(node) - with self.lock: - self._init_jmx_attributes() - self.start_jmx_tool(idx, node) - self.read_jmx_output(idx, node) - if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value: - return False - self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr]) - return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0 - - def _init_jmx_attributes(self): - # Must hold lock - if self.new_consumer: - # We use a flag to track whether we're using this automatically generated ID because the service could be - # restarted multiple times and the client ID may be changed. - if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names: - self._automatic_metrics = True - self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id] - self.jmx_attributes = ["assigned-partitions"] - self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id - diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index d915524e69576..22aa096db0320 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -18,8 +18,6 @@ from kafkatest.utils import validate_delivery -import time - class ProduceConsumeValidateTest(Test): """This class provides a shared template for tests which follow the common pattern of: @@ -56,20 +54,10 @@ def start_producer_and_consumer(self): if (self.consumer_init_timeout_sec > 0): self.logger.debug("Waiting %ds for the consumer to initialize.", self.consumer_init_timeout_sec) - start = int(time.time()) wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True, timeout_sec=self.consumer_init_timeout_sec, err_msg="Consumer process took more than %d s to fork" %\ self.consumer_init_timeout_sec) - end = int(time.time()) - remaining_time = self.consumer_init_timeout_sec - (end - start) - if remaining_time < 0 : - remaining_time = 0 - if self.consumer.new_consumer: - wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True, - timeout_sec=remaining_time, - err_msg="Consumer process took more than %d s to have partitions assigned" %\ - remaining_time) self.producer.start() wait_until(lambda: self.producer.num_acked > 5,