diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 3aeed906b1d84..40a900b1a7652 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -19,7 +19,7 @@ from ducktape.services.background_thread import BackgroundThreadService from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin -from kafkatest.services.monitor.jmx import JmxMixin +from kafkatest.services.monitor.jmx import JmxTool from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 """ @@ -27,7 +27,7 @@ """ -class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): +class ConsoleConsumer(KafkaPathResolverMixin, BackgroundThreadService): # Root directory for persistent output PERSISTENT_ROOT = "/mnt/console_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout") @@ -88,8 +88,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro client_prop_file_override Override client.properties file used by the consumer consumer_properties A dict of values to pass in as --consumer-property key=value """ - JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), - root=ConsoleConsumer.PERSISTENT_ROOT) BackgroundThreadService.__init__(self, context, num_nodes) self.kafka = kafka self.new_consumer = new_consumer @@ -102,6 +100,10 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro for node in self.nodes: node.version = version + self.jmx_tool = JmxTool(context, num_nodes=num_nodes, + jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), + root=ConsoleConsumer.PERSISTENT_ROOT) + self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} @@ -152,7 +154,7 @@ def start_cmd(self, node): args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG args['config_file'] = ConsoleConsumer.CONFIG_FILE args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE - args['jmx_port'] = self.jmx_port + args['jmx_port'] = self.jmx_tool.jmx_port args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node) args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) @@ -249,9 +251,8 @@ def _worker(self, idx, node): consumer_output = node.account.ssh_capture(cmd, allow_fail=False) with self.lock: - self._init_jmx_attributes() - self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names) - self.start_jmx_tool(idx, node) + self.logger.debug("collecting following jmx objects: %s", self.jmx_tool.jmx_object_names) + self.jmx_tool.start_jmx_tool(idx, node) for line in consumer_output: msg = line.strip() @@ -267,7 +268,7 @@ def _worker(self, idx, node): self.messages_consumed[idx].append(msg) with self.lock: - self.read_jmx_output(idx, node) + self.jmx_tool.read_jmx_output(idx, node) def start_node(self, node): BackgroundThreadService.start_node(self, node) @@ -285,35 +286,10 @@ def clean_node(self, node): if self.alive(node): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) - JmxMixin.clean_node(self, node) + self.jmx_tool.clean_node(self.idx(node), node) node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) def java_class_name(self): return "ConsoleConsumer" - - def has_partitions_assigned(self, node): - if self.new_consumer is False: - return False - idx = self.idx(node) - with self.lock: - self._init_jmx_attributes() - self.start_jmx_tool(idx, node) - self.read_jmx_output(idx, node) - if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value: - return False - self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr]) - return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0 - - def _init_jmx_attributes(self): - # Must hold lock - if self.new_consumer: - # We use a flag to track whether we're using this automatically generated ID because the service could be - # restarted multiple times and the client ID may be changed. - if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names: - self._automatic_metrics = True - self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id] - self.jmx_attributes = ["assigned-partitions"] - self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id - diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 4ca71b4cb852e..b681dd6c11cec 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -405,7 +405,7 @@ def thread_dump(self, node): self.logger.warn("Could not dump threads on node") def clean_node(self, node): - JmxMixin.clean_node(self, node) + JmxMixin.clean_node(self, self.idx(node), node) self.security_config.clean_node(node) node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index c5b747da08244..62d821b869b64 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -18,6 +18,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.utils.util import wait_until from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin class JmxMixin(object): """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats. @@ -41,10 +42,13 @@ def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_po self.jmx_tool_log = os.path.join(root, "jmx_tool.log") self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log") - def clean_node(self, node): + def clean_node(self, idx, node): + if self.jmx_object_names is None: + self.logger.debug("%s: Not cleaning jmx tool because no jmx objects are defined" % node.account) + return + node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False, allow_fail=True) - idx = self.idx(node) self.started[idx-1] = False node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False) @@ -139,3 +143,15 @@ def read_jmx_output_all_nodes(self): def jmx_class_name(self): return "kafka.tools.JmxTool" + +class JmxTool(JmxMixin, KafkaPathResolverMixin): + """ + Simple helper class for using the JmxTool directly instead of as a mix-in + """ + def __init__(self, text_context, num_nodes=1, *args, **kwargs): + JmxMixin.__init__(self, num_nodes=num_nodes, *args, **kwargs) + self.context = text_context + + @property + def logger(self): + return self.context.logger diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 049c272afd941..2badf65db597e 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -94,7 +94,7 @@ def start_node(self, node): def clean_node(self, node): if self.jmx_option != "": - JmxMixin.clean_node(self, node) + JmxMixin.clean_node(self, self.idx(node), node) super(StreamsSimpleBenchmarkService, self).clean_node(node) diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 4cbceefa4fba2..e4b43d5b91b21 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -214,7 +214,7 @@ def validate(self, broker, producer, consumer): # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100) consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id - consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name] + consumer_maximum_bps = consumer.jmx_tool.maximum_jmx_value[consumer_attribute_name] consumer_quota_bps = self.quota_config.consumer_quota self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps)) if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1): diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index fde1bafe07c41..ef3772880c82a 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -18,29 +18,15 @@ from ducktape.mark.resource import cluster -from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService -from kafkatest.services.monitor.jmx import JmxMixin +from kafkatest.services.monitor.jmx import JmxTool from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -class JmxTool(JmxMixin, KafkaPathResolverMixin): - """ - Simple helper class for using the JmxTool directly instead of as a mix-in - """ - def __init__(self, text_context, *args, **kwargs): - JmxMixin.__init__(self, num_nodes=1, *args, **kwargs) - self.context = text_context - - @property - def logger(self): - return self.context.logger - - class FetchFromFollowerTest(ProduceConsumeValidateTest): RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector" diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index d915524e69576..22aa096db0320 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -18,8 +18,6 @@ from kafkatest.utils import validate_delivery -import time - class ProduceConsumeValidateTest(Test): """This class provides a shared template for tests which follow the common pattern of: @@ -56,20 +54,10 @@ def start_producer_and_consumer(self): if (self.consumer_init_timeout_sec > 0): self.logger.debug("Waiting %ds for the consumer to initialize.", self.consumer_init_timeout_sec) - start = int(time.time()) wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True, timeout_sec=self.consumer_init_timeout_sec, err_msg="Consumer process took more than %d s to fork" %\ self.consumer_init_timeout_sec) - end = int(time.time()) - remaining_time = self.consumer_init_timeout_sec - (end - start) - if remaining_time < 0 : - remaining_time = 0 - if self.consumer.new_consumer: - wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True, - timeout_sec=remaining_time, - err_msg="Consumer process took more than %d s to have partitions assigned" %\ - remaining_time) self.producer.start() wait_until(lambda: self.producer.num_acked > 5,