diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 0811bcd845ab2..d349ed73affe4 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -17,9 +17,10 @@ import os from ducktape.services.background_thread import BackgroundThreadService +from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin -from kafkatest.services.monitor.jmx import JmxMixin +from kafkatest.services.monitor.jmx import JmxMixin, JmxTool from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 """ @@ -62,7 +63,8 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, print_partition=False, isolation_level="read_uncommitted", jaas_override_variables=None, - kafka_opts_override="", client_prop_file_override="", consumer_properties={}): + kafka_opts_override="", client_prop_file_override="", consumer_properties={}, + wait_until_partitions_assigned=False): """ Args: context: standard context @@ -124,6 +126,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.kafka_opts_override = kafka_opts_override self.client_prop_file_override = client_prop_file_override self.consumer_properties = consumer_properties + self.wait_until_partitions_assigned = wait_until_partitions_assigned def prop_file(self, node): @@ -273,8 +276,29 @@ def _worker(self, idx, node): with self.lock: self.read_jmx_output(idx, node) + def _wait_until_partitions_assigned(self, node, timeout_sec=60): + if self.jmx_object_names is not None: + raise Exception("'wait_until_partitions_assigned' is not supported while using 'jmx_object_names'/'jmx_attributes'") + jmx_tool = JmxTool(self.context, jmx_poll_ms=100) + jmx_tool.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id] + jmx_tool.jmx_attributes = ["assigned-partitions"] + jmx_tool.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id + jmx_tool.start_jmx_tool(self.idx(node), node) + assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id + + def read_and_check(): + jmx_tool.read_jmx_output(self.idx(node), node) + return assigned_partitions_jmx_attr in jmx_tool.maximum_jmx_value + + wait_until(lambda: read_and_check(), + timeout_sec=timeout_sec, + backoff_sec=.5, + err_msg="consumer was not assigned partitions within %d seconds" % timeout_sec) + def start_node(self, node): BackgroundThreadService.start_node(self, node) + if self.wait_until_partitions_assigned: + self._wait_until_partitions_assigned(node) def stop_node(self, node): self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account))) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index c5b747da08244..2dcd369512dac 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -17,6 +17,8 @@ from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.utils.util import wait_until + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH class JmxMixin(object): @@ -41,10 +43,11 @@ def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_po self.jmx_tool_log = os.path.join(root, "jmx_tool.log") self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log") - def clean_node(self, node): + def clean_node(self, node, idx=None): node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False, allow_fail=True) - idx = self.idx(node) + if idx is None: + idx = self.idx(node) self.started[idx-1] = False node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False) @@ -139,3 +142,15 @@ def read_jmx_output_all_nodes(self): def jmx_class_name(self): return "kafka.tools.JmxTool" + +class JmxTool(JmxMixin, KafkaPathResolverMixin): + """ + Simple helper class for using the JmxTool directly instead of as a mix-in + """ + def __init__(self, text_context, *args, **kwargs): + JmxMixin.__init__(self, num_nodes=1, *args, **kwargs) + self.context = text_context + + @property + def logger(self): + return self.context.logger diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index fde1bafe07c41..ef3772880c82a 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -18,29 +18,15 @@ from ducktape.mark.resource import cluster -from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService -from kafkatest.services.monitor.jmx import JmxMixin +from kafkatest.services.monitor.jmx import JmxTool from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -class JmxTool(JmxMixin, KafkaPathResolverMixin): - """ - Simple helper class for using the JmxTool directly instead of as a mix-in - """ - def __init__(self, text_context, *args, **kwargs): - JmxMixin.__init__(self, num_nodes=1, *args, **kwargs) - self.context = text_context - - @property - def logger(self): - return self.context.logger - - class FetchFromFollowerTest(ProduceConsumeValidateTest): RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector" diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index 4a8327ebb0a12..f29ec2b63cb42 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -165,7 +165,8 @@ def test_throttled_reassignment(self, bounce_brokers): self.topic, consumer_timeout_ms=60000, message_validator=is_int, - from_beginning=False) + from_beginning=False, + wait_until_partitions_assigned=True) self.kafka.start() bulk_producer.run()