Skip to content

Commit

Permalink
MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)
Browse files Browse the repository at this point in the history
Do not initialize `JmxTool` by default when running console consumer. In order to support this, we remove `has_partitions_assigned` and its only usage in an assertion inside `ProduceConsumeValidateTest`, which did not seem to contribute much to the validation.

Reviewers: David Arthur <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
brianbushree authored and hachikuji committed Jan 10, 2020
1 parent 505e824 commit 422bc1f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 38 deletions.
26 changes: 0 additions & 26 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ def _worker(self, idx, node):
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)

with self.lock:
self._init_jmx_attributes()
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
self.start_jmx_tool(idx, node)

Expand Down Expand Up @@ -292,28 +291,3 @@ def clean_node(self, node):

def java_class_name(self):
return "ConsoleConsumer"

def has_partitions_assigned(self, node):
if self.new_consumer is False:
return False
idx = self.idx(node)
with self.lock:
self._init_jmx_attributes()
self.start_jmx_tool(idx, node)
self.read_jmx_output(idx, node)
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
return False
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0

def _init_jmx_attributes(self):
# Must hold lock
if self.new_consumer:
# We use a flag to track whether we're using this automatically generated ID because the service could be
# restarted multiple times and the client ID may be changed.
if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
self._automatic_metrics = True
self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
self.jmx_attributes = ["assigned-partitions"]
self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id

12 changes: 0 additions & 12 deletions tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

from kafkatest.utils import validate_delivery

import time

class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
Expand Down Expand Up @@ -56,20 +54,10 @@ def start_producer_and_consumer(self):
if (self.consumer_init_timeout_sec > 0):
self.logger.debug("Waiting %ds for the consumer to initialize.",
self.consumer_init_timeout_sec)
start = int(time.time())
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
timeout_sec=self.consumer_init_timeout_sec,
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
end = int(time.time())
remaining_time = self.consumer_init_timeout_sec - (end - start)
if remaining_time < 0 :
remaining_time = 0
if self.consumer.new_consumer:
wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
timeout_sec=remaining_time,
err_msg="Consumer process took more than %d s to have partitions assigned" %\
remaining_time)

self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
Expand Down

0 comments on commit 422bc1f

Please sign in to comment.