From 6cb1365a41f15b954cb8c00e24265942fc8bea8e Mon Sep 17 00:00:00 2001 From: Ian Bucad Date: Tue, 10 Dec 2019 04:44:07 +0800 Subject: [PATCH] Fix `kafka_client_api_version` (#5007) * Converts api_version string to tuple * Creates KafkaAdminClient per instance each instance could potentially have different api_versions, connections, etc * fix style * fix UnboundLocalError * links upstream PR * Update kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py Co-Authored-By: Julia <611228+hithwen@users.noreply.github.com> * Revert "Creates KafkaAdminClient per instance" This reverts commit 71e10ecc64d4f15917239d5322c838a9df650716. --- .../datadog_checks/kafka_consumer/kafka_consumer.py | 13 +++++++++++-- .../datadog_checks/kafka_consumer/legacy_0_10_2.py | 6 +++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 4a2c64271ee7d..7b3bc17a35136 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -69,6 +69,9 @@ def __init__(self, name, init_config, instances): ) self._consumer_groups = self.instance.get('consumer_groups', {}) self._kafka_client = self._create_kafka_admin_client() + self._kafka_version = self.instance.get('kafka_client_api_version') + if isinstance(self._kafka_version, str): + self._kafka_version = tuple(map(int, self._kafka_version.split("."))) def check(self, instance): """The main entrypoint of the check.""" @@ -123,7 +126,10 @@ def _create_kafka_admin_client(self): bootstrap_servers=kafka_connect_str, client_id='dd-agent', request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, - api_version=self.instance.get('kafka_client_api_version'), + # There is a bug with kafka-python where pinning api_version for KafkaAdminClient raises an + # `IncompatibleBrokerVersion`. Change to `api_version=self._kafka_version` once fixed upstream. + # See linked issues in PR: https://github.com/dpkp/kafka-python/pull/1953 + api_version=None, # While we check for SASL/SSL params, if not present they will default to the kafka-python values for # plaintext connections security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'), @@ -139,6 +145,7 @@ def _create_kafka_admin_client(self): ssl_crlfile=self.instance.get('ssl_crlfile'), ssl_password=self.instance.get('ssl_password'), ) + self.log.debug("KafkaAdminClient api_version: {}".format(kafka_admin_client.config['api_version'])) # Force initial population of the local cluster metadata cache kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update()) if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None: @@ -432,6 +439,8 @@ def _send_event(self, title, text, tags, event_type, aggregation_key, severity=' def _determine_kafka_version(cls, init_config, instance): """Return the Kafka cluster version as a tuple.""" kafka_version = instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) if kafka_version is None: # if unspecified by the user, we have to probe the cluster kafka_connect_str = instance.get('kafka_connect_str') # TODO call validation method kafka_client = KafkaClient( @@ -441,7 +450,7 @@ def _determine_kafka_version(cls, init_config, instance): # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for # broker version during the bootstrapping process. Note that this returns the first version found, so in # a mixed-version cluster this will be a non-deterministic result. - api_version=instance.get('kafka_client_api_version'), + api_version=kafka_version, # While we check for SASL/SSL params, if not present they will default to the kafka-python values for # plaintext connections security_protocol=instance.get('security_protocol', 'PLAINTEXT'), diff --git a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py index d3f1112116419..863bc580e6d0a 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py @@ -65,6 +65,7 @@ def __init__(self, name, init_config, instances): def check(self, instance): """The main entrypoint of the check.""" + self.log.debug("Running legacy Kafka Consumer check.") self._zk_consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._kafka_consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._highwater_offsets = {} # Expected format: {(topic, partition): offset} @@ -126,6 +127,9 @@ def _create_kafka_client(self): kafka_conn_str = self.instance.get('kafka_connect_str') if not isinstance(kafka_conn_str, (string_types, list)): raise ConfigurationError('kafka_connect_str should be string or list of strings') + kafka_version = self.instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) kafka_client = KafkaClient( bootstrap_servers=kafka_conn_str, client_id='dd-agent', @@ -133,7 +137,7 @@ def _create_kafka_client(self): # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for broker # version during the bootstrapping process. Note that probing randomly picks a broker to probe, so in a # mixed-version cluster probing returns a non-deterministic result. - api_version=self.instance.get('kafka_client_api_version'), + api_version=kafka_version, # While we check for SSL params, if not present they will default to the kafka-python values for plaintext # connections security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),