Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: disable JmxTool in kafkatest console-consumer by default #7785

Merged
merged 2 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
from ducktape.services.background_thread import BackgroundThreadService

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0

"""
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
"""


class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService):
class ConsoleConsumer(KafkaPathResolverMixin, BackgroundThreadService):
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/console_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
Expand Down Expand Up @@ -88,8 +88,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
client_prop_file_override Override client.properties file used by the consumer
consumer_properties A dict of values to pass in as --consumer-property key=value
"""
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
self.new_consumer = new_consumer
Expand All @@ -102,6 +100,10 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
for node in self.nodes:
node.version = version

self.jmx_tool = JmxTool(context, num_nodes=num_nodes,
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah to your earlier point, I've tried to avoid special casing by always initializing a JmxTool regardless of whether it'll be used

@hachikuji however, now that this is in place, it seems like we should maybe just stick with mixin approach since we're always initializing this object anyways? (this way we can also avoid some of the clean_node changes required here)

with that being said, I do like the consumer.jmx_tool namespace isolation

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianbushree I don't mind going back to the mixin if you think that's better. I'll leave it up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this mixin approach is better for two reasons here:

  1. it should minimize this PR
  2. seems less like a one-off approach since we already use the mixin everywhere

I'm going to refactor this PR accordingly.


self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
Expand Down Expand Up @@ -152,7 +154,7 @@ def start_cmd(self, node):
args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG
args['config_file'] = ConsoleConsumer.CONFIG_FILE
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['jmx_port'] = self.jmx_port
args['jmx_port'] = self.jmx_tool.jmx_port
args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)

Expand Down Expand Up @@ -249,9 +251,8 @@ def _worker(self, idx, node):
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)

with self.lock:
self._init_jmx_attributes()
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
self.start_jmx_tool(idx, node)
self.logger.debug("collecting following jmx objects: %s", self.jmx_tool.jmx_object_names)
self.jmx_tool.start_jmx_tool(idx, node)

for line in consumer_output:
msg = line.strip()
Expand All @@ -267,7 +268,7 @@ def _worker(self, idx, node):
self.messages_consumed[idx].append(msg)

with self.lock:
self.read_jmx_output(idx, node)
self.jmx_tool.read_jmx_output(idx, node)

def start_node(self, node):
BackgroundThreadService.start_node(self, node)
Expand All @@ -285,35 +286,10 @@ def clean_node(self, node):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
JmxMixin.clean_node(self, node)
self.jmx_tool.clean_node(self.idx(node), node)
node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

def java_class_name(self):
return "ConsoleConsumer"

def has_partitions_assigned(self, node):
if self.new_consumer is False:
return False
idx = self.idx(node)
with self.lock:
self._init_jmx_attributes()
self.start_jmx_tool(idx, node)
self.read_jmx_output(idx, node)
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
return False
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0

def _init_jmx_attributes(self):
# Must hold lock
if self.new_consumer:
# We use a flag to track whether we're using this automatically generated ID because the service could be
# restarted multiple times and the client ID may be changed.
if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
self._automatic_metrics = True
self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
self.jmx_attributes = ["assigned-partitions"]
self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id

2 changes: 1 addition & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def thread_dump(self, node):
self.logger.warn("Could not dump threads on node")

def clean_node(self, node):
JmxMixin.clean_node(self, node)
JmxMixin.clean_node(self, self.idx(node), node)
self.security_config.clean_node(node)
node.account.kill_java_processes(self.java_class_name(),
clean_shutdown=False, allow_fail=True)
Expand Down
20 changes: 18 additions & 2 deletions tests/kafkatest/services/monitor/jmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin

class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
Expand All @@ -41,10 +42,13 @@ def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_po
self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")

def clean_node(self, node):
def clean_node(self, idx, node):
brianbushree marked this conversation as resolved.
Show resolved Hide resolved
if self.jmx_object_names is None:
self.logger.debug("%s: Not cleaning jmx tool because no jmx objects are defined" % node.account)
return

node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False,
allow_fail=True)
idx = self.idx(node)
self.started[idx-1] = False
node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False)

Expand Down Expand Up @@ -139,3 +143,15 @@ def read_jmx_output_all_nodes(self):

def jmx_class_name(self):
return "kafka.tools.JmxTool"

class JmxTool(JmxMixin, KafkaPathResolverMixin):
"""
Simple helper class for using the JmxTool directly instead of as a mix-in
"""
def __init__(self, text_context, num_nodes=1, *args, **kwargs):
JmxMixin.__init__(self, num_nodes=num_nodes, *args, **kwargs)
self.context = text_context

@property
def logger(self):
return self.context.logger
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start_node(self, node):

def clean_node(self, node):
if self.jmx_option != "":
JmxMixin.clean_node(self, node)
JmxMixin.clean_node(self, self.idx(node), node)

super(StreamsSimpleBenchmarkService, self).clean_node(node)

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def validate(self, broker, producer, consumer):

# validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
consumer_maximum_bps = consumer.jmx_tool.maximum_jmx_value[consumer_attribute_name]
consumer_quota_bps = self.quota_config.consumer_quota
self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
Expand Down
16 changes: 1 addition & 15 deletions tests/kafkatest/tests/core/fetch_from_follower_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,15 @@

from ducktape.mark.resource import cluster

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int


class JmxTool(JmxMixin, KafkaPathResolverMixin):
"""
Simple helper class for using the JmxTool directly instead of as a mix-in
"""
def __init__(self, text_context, *args, **kwargs):
JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
self.context = text_context

@property
def logger(self):
return self.context.logger


class FetchFromFollowerTest(ProduceConsumeValidateTest):

RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
Expand Down
12 changes: 0 additions & 12 deletions tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

from kafkatest.utils import validate_delivery

import time

class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
Expand Down Expand Up @@ -56,20 +54,10 @@ def start_producer_and_consumer(self):
if (self.consumer_init_timeout_sec > 0):
self.logger.debug("Waiting %ds for the consumer to initialize.",
self.consumer_init_timeout_sec)
start = int(time.time())
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
timeout_sec=self.consumer_init_timeout_sec,
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
end = int(time.time())
remaining_time = self.consumer_init_timeout_sec - (end - start)
if remaining_time < 0 :
remaining_time = 0
if self.consumer.new_consumer:
wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
timeout_sec=remaining_time,
err_msg="Consumer process took more than %d s to have partitions assigned" %\
remaining_time)

self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
Expand Down