Skip to content

Commit

Permalink
Fix kafka_client_api_version (#5007)
Browse files Browse the repository at this point in the history
* Converts api_version string to tuple

* Creates KafkaAdminClient per instance

each instance could potentially have different api_versions, connections, etc

* fix style

* fix UnboundLocalError

* links upstream PR

* Update kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py

Co-Authored-By: Julia <[email protected]>

* Revert "Creates KafkaAdminClient per instance"

This reverts commit 71e10ec.
  • Loading branch information
ian28223 authored and ofek committed Dec 9, 2019
1 parent 9107a2f commit 6cb1365
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
13 changes: 11 additions & 2 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def __init__(self, name, init_config, instances):
)
self._consumer_groups = self.instance.get('consumer_groups', {})
self._kafka_client = self._create_kafka_admin_client()
self._kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(self._kafka_version, str):
self._kafka_version = tuple(map(int, self._kafka_version.split(".")))

def check(self, instance):
"""The main entrypoint of the check."""
Expand Down Expand Up @@ -123,7 +126,10 @@ def _create_kafka_admin_client(self):
bootstrap_servers=kafka_connect_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
api_version=self.instance.get('kafka_client_api_version'),
# There is a bug with kafka-python where pinning api_version for KafkaAdminClient raises an
# `IncompatibleBrokerVersion`. Change to `api_version=self._kafka_version` once fixed upstream.
# See linked issues in PR: https://github.com/dpkp/kafka-python/pull/1953
api_version=None,
# While we check for SASL/SSL params, if not present they will default to the kafka-python values for
# plaintext connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
Expand All @@ -139,6 +145,7 @@ def _create_kafka_admin_client(self):
ssl_crlfile=self.instance.get('ssl_crlfile'),
ssl_password=self.instance.get('ssl_password'),
)
self.log.debug("KafkaAdminClient api_version: {}".format(kafka_admin_client.config['api_version']))
# Force initial population of the local cluster metadata cache
kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update())
if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None:
Expand Down Expand Up @@ -432,6 +439,8 @@ def _send_event(self, title, text, tags, event_type, aggregation_key, severity='
def _determine_kafka_version(cls, init_config, instance):
"""Return the Kafka cluster version as a tuple."""
kafka_version = instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))
if kafka_version is None: # if unspecified by the user, we have to probe the cluster
kafka_connect_str = instance.get('kafka_connect_str') # TODO call validation method
kafka_client = KafkaClient(
Expand All @@ -441,7 +450,7 @@ def _determine_kafka_version(cls, init_config, instance):
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
api_version=instance.get('kafka_client_api_version'),
api_version=kafka_version,
# While we check for SASL/SSL params, if not present they will default to the kafka-python values for
# plaintext connections
security_protocol=instance.get('security_protocol', 'PLAINTEXT'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self, name, init_config, instances):

def check(self, instance):
"""The main entrypoint of the check."""
self.log.debug("Running legacy Kafka Consumer check.")
self._zk_consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
self._kafka_consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
self._highwater_offsets = {} # Expected format: {(topic, partition): offset}
Expand Down Expand Up @@ -126,14 +127,17 @@ def _create_kafka_client(self):
kafka_conn_str = self.instance.get('kafka_connect_str')
if not isinstance(kafka_conn_str, (string_types, list)):
raise ConfigurationError('kafka_connect_str should be string or list of strings')
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))
kafka_client = KafkaClient(
bootstrap_servers=kafka_conn_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for broker
# version during the bootstrapping process. Note that probing randomly picks a broker to probe, so in a
# mixed-version cluster probing returns a non-deterministic result.
api_version=self.instance.get('kafka_client_api_version'),
api_version=kafka_version,
# While we check for SSL params, if not present they will default to the kafka-python values for plaintext
# connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
Expand Down

0 comments on commit 6cb1365

Please sign in to comment.