Skip to content

Commit

Permalink
moving JmxTool class + disabling jmx by default in consumer class
Browse files Browse the repository at this point in the history
  • Loading branch information
brianbushree committed Jan 9, 2020
1 parent a024e67 commit a3677b9
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 67 deletions.
46 changes: 11 additions & 35 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
from ducktape.services.background_thread import BackgroundThreadService

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0

"""
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
"""


class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService):
class ConsoleConsumer(KafkaPathResolverMixin, BackgroundThreadService):
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/console_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
Expand Down Expand Up @@ -88,8 +88,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
client_prop_file_override Override client.properties file used by the consumer
consumer_properties A dict of values to pass in as --consumer-property key=value
"""
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
self.new_consumer = new_consumer
Expand All @@ -102,6 +100,10 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
for node in self.nodes:
node.version = version

self.jmx_tool = JmxTool(context, num_nodes=num_nodes,
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)

self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
Expand Down Expand Up @@ -152,7 +154,7 @@ def start_cmd(self, node):
args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG
args['config_file'] = ConsoleConsumer.CONFIG_FILE
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['jmx_port'] = self.jmx_port
args['jmx_port'] = self.jmx_tool.jmx_port
args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)

Expand Down Expand Up @@ -249,9 +251,8 @@ def _worker(self, idx, node):
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)

with self.lock:
self._init_jmx_attributes()
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
self.start_jmx_tool(idx, node)
self.logger.debug("collecting following jmx objects: %s", self.jmx_tool.jmx_object_names)
self.jmx_tool.start_jmx_tool(idx, node)

for line in consumer_output:
msg = line.strip()
Expand All @@ -267,7 +268,7 @@ def _worker(self, idx, node):
self.messages_consumed[idx].append(msg)

with self.lock:
self.read_jmx_output(idx, node)
self.jmx_tool.read_jmx_output(idx, node)

def start_node(self, node):
BackgroundThreadService.start_node(self, node)
Expand All @@ -285,35 +286,10 @@ def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
JmxMixin.clean_node(self, node)
self.jmx_tool.clean_node(self.idx(node), node)
node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

def java_class_name(self):
return "ConsoleConsumer"

def has_partitions_assigned(self, node):
if self.new_consumer is False:
return False
idx = self.idx(node)
with self.lock:
self._init_jmx_attributes()
self.start_jmx_tool(idx, node)
self.read_jmx_output(idx, node)
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
return False
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0

def _init_jmx_attributes(self):
# Must hold lock
if self.new_consumer:
# We use a flag to track whether we're using this automatically generated ID because the service could be
# restarted multiple times and the client ID may be changed.
if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
self._automatic_metrics = True
self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
self.jmx_attributes = ["assigned-partitions"]
self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id

2 changes: 1 addition & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def thread_dump(self, node):
self.logger.warn("Could not dump threads on node")

def clean_node(self, node):
JmxMixin.clean_node(self, node)
JmxMixin.clean_node(self, self.idx(node), node)
self.security_config.clean_node(node)
node.account.kill_java_processes(self.java_class_name(),
clean_shutdown=False, allow_fail=True)
Expand Down
20 changes: 18 additions & 2 deletions tests/kafkatest/services/monitor/jmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin

class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
Expand All @@ -41,10 +42,13 @@ def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_po
self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")

def clean_node(self, node):
def clean_node(self, idx, node):
if self.jmx_object_names is None:
self.logger.debug("%s: Not cleaning jmx tool because no jmx objects are defined" % node.account)
return

node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False,
allow_fail=True)
idx = self.idx(node)
self.started[idx-1] = False
node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False)

Expand Down Expand Up @@ -139,3 +143,15 @@ def read_jmx_output_all_nodes(self):

def jmx_class_name(self):
return "kafka.tools.JmxTool"

class JmxTool(JmxMixin, KafkaPathResolverMixin):
"""
Simple helper class for using the JmxTool directly instead of as a mix-in
"""
def __init__(self, text_context, num_nodes=1, *args, **kwargs):
JmxMixin.__init__(self, num_nodes=num_nodes, *args, **kwargs)
self.context = text_context

@property
def logger(self):
return self.context.logger
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start_node(self, node):

def clean_node(self, node):
if self.jmx_option != "":
JmxMixin.clean_node(self, node)
JmxMixin.clean_node(self, self.idx(node), node)

super(StreamsSimpleBenchmarkService, self).clean_node(node)

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def validate(self, broker, producer, consumer):

# validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
consumer_maximum_bps = consumer.jmx_tool.maximum_jmx_value[consumer_attribute_name]
consumer_quota_bps = self.quota_config.consumer_quota
self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
Expand Down
16 changes: 1 addition & 15 deletions tests/kafkatest/tests/core/fetch_from_follower_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,15 @@

from ducktape.mark.resource import cluster

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int


class JmxTool(JmxMixin, KafkaPathResolverMixin):
"""
Simple helper class for using the JmxTool directly instead of as a mix-in
"""
def __init__(self, text_context, *args, **kwargs):
JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
self.context = text_context

@property
def logger(self):
return self.context.logger


class FetchFromFollowerTest(ProduceConsumeValidateTest):

RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
Expand Down
12 changes: 0 additions & 12 deletions tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

from kafkatest.utils import validate_delivery

import time

class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
Expand Down Expand Up @@ -56,20 +54,10 @@ def start_producer_and_consumer(self):
if (self.consumer_init_timeout_sec > 0):
self.logger.debug("Waiting %ds for the consumer to initialize.",
self.consumer_init_timeout_sec)
start = int(time.time())
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
timeout_sec=self.consumer_init_timeout_sec,
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
end = int(time.time())
remaining_time = self.consumer_init_timeout_sec - (end - start)
if remaining_time < 0 :
remaining_time = 0
if self.consumer.new_consumer:
wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
timeout_sec=remaining_time,
err_msg="Consumer process took more than %d s to have partitions assigned" %\
remaining_time)

self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
Expand Down

0 comments on commit a3677b9

Please sign in to comment.